small improvements

This commit is contained in:
fbarbu15 2023-11-07 18:10:25 +02:00
parent 88b1fa3e90
commit 2810fa11cd
No known key found for this signature in database
GPG Key ID: D75221C8DEA22501
3 changed files with 28 additions and 12 deletions

View File

@ -16,12 +16,18 @@ class WakuNode:
self._log_path = os.path.join(LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log") self._log_path = os.path.join(LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log")
self._docker_manager = DockerManager(self._image_name) self._docker_manager = DockerManager(self._image_name)
self._container = None self._container = None
logger.debug("WakuNode instance initialized with log path %s", self._log_path)
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def start(self, **kwargs):
logger.debug("Starting Node...")
self._docker_manager.create_network()
self._ext_ip = self._docker_manager.generate_random_ext_ip() self._ext_ip = self._docker_manager.generate_random_ext_ip()
self._ports = self._docker_manager.generate_ports() self._ports = self._docker_manager.generate_ports()
self._rest_port = self._ports[0] self._rest_port = self._ports[0]
self._rpc_port = self._ports[1] self._rpc_port = self._ports[1]
self._websocket_port = self._ports[2] self._websocket_port = self._ports[2]
logger.debug("WakuNode instance initialized with log path %s", self._log_path)
if PROTOCOL == "RPC": if PROTOCOL == "RPC":
self._api = RPC(self._rpc_port, self._image_name) self._api = RPC(self._rpc_port, self._image_name)
elif PROTOCOL == "REST": elif PROTOCOL == "REST":
@ -29,11 +35,6 @@ class WakuNode:
else: else:
raise ValueError(f"Unknown protocol: {PROTOCOL}") raise ValueError(f"Unknown protocol: {PROTOCOL}")
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def start(self, **kwargs):
logger.debug("Starting Node...")
self._docker_manager.create_network()
default_args = { default_args = {
"listen-address": "0.0.0.0", "listen-address": "0.0.0.0",
"rpc": "true", "rpc": "true",

View File

@ -26,6 +26,10 @@ class StepsRelay:
@allure.step @allure.step
@retry(stop=stop_after_delay(20), wait=wait_fixed(0.5), reraise=True) @retry(stop=stop_after_delay(20), wait=wait_fixed(0.5), reraise=True)
def check_published_message_reaches_peer_with_retry(self, message):
self.check_published_message_reaches_peer(message)
@allure.step
def check_published_message_reaches_peer(self, message): def check_published_message_reaches_peer(self, message):
message.timestamp = int(time() * 1e9) message.timestamp = int(time() * 1e9)
self.node1.send_message(message, self.test_pubsub_topic) self.node1.send_message(message, self.test_pubsub_topic)

View File

@ -11,23 +11,34 @@ logger = logging.getLogger(__name__)
class TestRelayPublish(StepsRelay): class TestRelayPublish(StepsRelay):
def test_publish_with_various_payloads(self): def test_publish_with_various_payloads(self):
failed_payloads = [] failed_payloads = []
for payload in SAMPLE_INPUTS: for index, payload in enumerate(SAMPLE_INPUTS):
logger.debug("Running test with payload %s", payload["description"]) logger.debug("Running test with payload %s", payload["description"])
message = MessageRpcQuery(payload=to_base64(payload["value"]), contentTopic=self.test_content_topic) message = MessageRpcQuery(payload=to_base64(payload["value"]), contentTopic=self.test_content_topic)
try: try:
self.check_published_message_reaches_peer(message) # The node may require a warmup period for message processing to stabilize.
# Therefore, we use the retry function for the first payload to account for this warmup.
if index == 0:
self.check_published_message_reaches_peer_with_retry(message)
else:
self.check_published_message_reaches_peer(message)
except Exception as e: except Exception as e:
logger.error("Payload %s failed: %s", {payload["description"]}, {str(e)}) logger.error("Payload %s failed: %s", payload["description"], str(e))
failed_payloads.append(payload) failed_payloads.append(payload["description"])
assert not failed_payloads, f"Payloads failed: {failed_payloads}" assert not failed_payloads, f"Payloads failed: {failed_payloads}"
def test_publish_with_various_content_topics(self): def test_publish_with_various_content_topics(self):
failed_content_topics = [] failed_content_topics = []
for content_topic in SAMPLE_INPUTS: for index, content_topic in enumerate(SAMPLE_INPUTS):
logger.debug("Running test with content topic %s", content_topic["description"]) logger.debug("Running test with content topic %s", content_topic["description"])
message = MessageRpcQuery(payload=to_base64(self.test_payload), contentTopic=content_topic["value"]) message = MessageRpcQuery(payload=to_base64(self.test_payload), contentTopic=content_topic["value"])
try: try:
self.check_published_message_reaches_peer(message) # The node may require a warmup period for message processing to stabilize.
# Therefore, we use the retry function for the first payload to account for this warmup.
if index == 0:
self.check_published_message_reaches_peer_with_retry(message)
else:
self.check_published_message_reaches_peer(message)
except Exception as e: except Exception as e:
logger.error("ContentTopic %s failed: %s", {content_topic["description"]}, {str(e)}) logger.error("ContentTopic %s failed: %s", {content_topic["description"]}, {str(e)})
failed_content_topics.append(content_topic) failed_content_topics.append(content_topic)