chore: go waku connection issues (#33)

* add go-waku peers via api

* add go-waku peers via api

* fix add_node_peer

* fix add_node_peer

* fix add_node_peer

* fix add_node_peer

* move create_message to commons
This commit is contained in:
fbarbu15 2024-04-23 10:44:12 +03:00 committed by GitHub
parent c1d4c0cc6b
commit 69c8e3f4b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 61 additions and 86 deletions

View File

@ -1,8 +1,6 @@
import errno import errno
import json import json
import os import os
import string
import pytest import pytest
import requests import requests
from src.libs.common import delay from src.libs.common import delay

36
src/steps/common.py Normal file
View File

@ -0,0 +1,36 @@
import inspect
from time import time
import allure
import pytest
from tenacity import retry, stop_after_delay, wait_fixed
from src.libs.common import to_base64
from src.libs.custom_logger import get_custom_logger
logger = get_custom_logger(__name__)
class StepsCommon:
@pytest.fixture(scope="function", autouse=True)
def common_setup(self):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
if not hasattr(self, "test_payload"):
self.test_payload = "Default Payload"
if not hasattr(self, "test_content_topic"):
self.test_content_topic = "/test/1/default/proto"
@allure.step
@retry(stop=stop_after_delay(20), wait=wait_fixed(0.5), reraise=True)
def add_node_peer(self, node, multiaddr_list, shards=[0, 1, 2, 3, 4, 5, 6, 7, 8]):
if node.is_nwaku():
for multiaddr in multiaddr_list:
node.add_peers([multiaddr])
elif node.is_gowaku():
for multiaddr in multiaddr_list:
peer_info = {"multiaddr": multiaddr, "protocols": ["/vac/waku/relay/2.0.0"], "shards": shards}
node.add_peers(peer_info)
@allure.step
def create_message(self, **kwargs):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
message.update(kwargs)
return message

View File

@ -9,12 +9,13 @@ from src.node.waku_message import WakuMessage
from src.env_vars import NODE_1, NODE_2, ADDITIONAL_NODES, NODEKEY from src.env_vars import NODE_1, NODE_2, ADDITIONAL_NODES, NODEKEY
from src.node.waku_node import WakuNode from src.node.waku_node import WakuNode
from tenacity import retry, stop_after_delay, wait_fixed from tenacity import retry, stop_after_delay, wait_fixed
from src.steps.common import StepsCommon
from src.test_data import VALID_PUBSUB_TOPICS from src.test_data import VALID_PUBSUB_TOPICS
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
class StepsFilter: class StepsFilter(StepsCommon):
test_pubsub_topic = VALID_PUBSUB_TOPICS[1] test_pubsub_topic = VALID_PUBSUB_TOPICS[1]
second_pubsub_topic = VALID_PUBSUB_TOPICS[2] second_pubsub_topic = VALID_PUBSUB_TOPICS[2]
another_cluster_pubsub_topic = "/waku/2/rs/2/2" another_cluster_pubsub_topic = "/waku/2/rs/2/2"
@ -38,8 +39,7 @@ class StepsFilter:
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
self.node2.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) self.node2.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
if self.node2.is_nwaku(): self.add_node_peer(self.node2, [self.multiaddr_with_id])
self.node2.add_peers([self.multiaddr_with_id])
self.main_nodes.append(self.node2) self.main_nodes.append(self.node2)
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
@ -75,8 +75,7 @@ class StepsFilter:
for index, node in enumerate(nodes): for index, node in enumerate(nodes):
node = WakuNode(node, f"node{index + 3}_{self.test_id}") node = WakuNode(node, f"node{index + 3}_{self.test_id}")
node.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id) node.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id)
if node.is_nwaku(): self.add_node_peer(node, [self.multiaddr_with_id])
node.add_peers([self.multiaddr_with_id])
self.optional_nodes.append(node) self.optional_nodes.append(node)
@allure.step @allure.step
@ -110,17 +109,6 @@ class StepsFilter:
except Exception as ex: except Exception as ex:
assert "Bad Request" in str(ex) or "Not Found" in str(ex) or "couldn't find any messages" in str(ex) assert "Bad Request" in str(ex) or "Not Found" in str(ex) or "couldn't find any messages" in str(ex)
@allure.step
def wait_for_published_message_to_reach_filter_peer(
self, timeout_duration=120, time_between_retries=1, pubsub_topic=None, sender=None, peer_list=None
):
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(time_between_retries), reraise=True)
def publish_and_check_filter_peer():
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
self.check_published_message_reaches_filter_peer(message, pubsub_topic=pubsub_topic, sender=sender, peer_list=peer_list)
publish_and_check_filter_peer()
@allure.step @allure.step
def wait_for_subscriptions_on_main_nodes(self, content_topic_list, pubsub_topic=None): def wait_for_subscriptions_on_main_nodes(self, content_topic_list, pubsub_topic=None):
if pubsub_topic is None: if pubsub_topic is None:
@ -215,9 +203,3 @@ class StepsFilter:
return node.get_filter_messages(content_topic) return node.get_filter_messages(content_topic)
else: else:
raise NotImplementedError("Not implemented for this node type") raise NotImplementedError("Not implemented for this node type")
@allure.step
def create_message(self, **kwargs):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
message.update(kwargs)
return message

View File

@ -12,11 +12,12 @@ from src.env_vars import (
NODEKEY, NODEKEY,
) )
from src.node.waku_node import WakuNode from src.node.waku_node import WakuNode
from src.steps.common import StepsCommon
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
class StepsLightPush: class StepsLightPush(StepsCommon):
test_content_topic = "/myapp/1/latest/proto" test_content_topic = "/myapp/1/latest/proto"
test_pubsub_topic = "/waku/2/rs/0/0" test_pubsub_topic = "/waku/2/rs/0/0"
test_payload = "Light push works!!" test_payload = "Light push works!!"
@ -28,19 +29,13 @@ class StepsLightPush:
self.optional_nodes = [] self.optional_nodes = []
self.multiaddr_list = [] self.multiaddr_list = []
@allure.step
def add_node_peer(self, node):
if node.is_nwaku():
for multiaddr in self.multiaddr_list:
node.add_peers([multiaddr])
@allure.step @allure.step
def start_receiving_node(self, image, node_index, **kwargs): def start_receiving_node(self, image, node_index, **kwargs):
node = WakuNode(image, f"receiving_node{node_index}_{self.test_id}") node = WakuNode(image, f"receiving_node{node_index}_{self.test_id}")
node.start(**kwargs) node.start(**kwargs)
if kwargs["relay"] == "true": if kwargs["relay"] == "true":
self.main_receiving_nodes.extend([node]) self.main_receiving_nodes.extend([node])
self.add_node_peer(node) self.add_node_peer(node, self.multiaddr_list)
self.multiaddr_list.extend([node.get_multiaddr_with_id()]) self.multiaddr_list.extend([node.get_multiaddr_with_id()])
return node return node
@ -50,7 +45,7 @@ class StepsLightPush:
node.start(discv5_bootstrap_node=self.enr_uri, lightpushnode=self.multiaddr_list[0], **kwargs) node.start(discv5_bootstrap_node=self.enr_uri, lightpushnode=self.multiaddr_list[0], **kwargs)
if kwargs["relay"] == "true": if kwargs["relay"] == "true":
self.main_receiving_nodes.extend([node]) self.main_receiving_nodes.extend([node])
self.add_node_peer(node) self.add_node_peer(node, self.multiaddr_list)
return node return node
@allure.step @allure.step
@ -133,12 +128,6 @@ class StepsLightPush:
waku_message = WakuMessage(get_messages_response) waku_message = WakuMessage(get_messages_response)
waku_message.assert_received_message(payload["message"]) waku_message.assert_received_message(payload["message"])
@allure.step
def create_message(self, **kwargs):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
message.update(kwargs)
return message
@allure.step @allure.step
def create_payload(self, pubsub_topic=None, message=None, **kwargs): def create_payload(self, pubsub_topic=None, message=None, **kwargs):
if message is None: if message is None:

View File

@ -16,12 +16,13 @@ from src.env_vars import (
) )
from src.node.waku_node import WakuNode, rln_credential_store_ready from src.node.waku_node import WakuNode, rln_credential_store_ready
from tenacity import retry, stop_after_delay, wait_fixed from tenacity import retry, stop_after_delay, wait_fixed
from src.steps.common import StepsCommon
from src.test_data import VALID_PUBSUB_TOPICS from src.test_data import VALID_PUBSUB_TOPICS
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
class StepsRelay: class StepsRelay(StepsCommon):
test_pubsub_topic = VALID_PUBSUB_TOPICS[1] test_pubsub_topic = VALID_PUBSUB_TOPICS[1]
test_content_topic = "/test/1/waku-relay/proto" test_content_topic = "/test/1/waku-relay/proto"
test_payload = "Relay works!!" test_payload = "Relay works!!"
@ -41,8 +42,7 @@ class StepsRelay:
self.multiaddr_with_id = self.node1.get_multiaddr_with_id() self.multiaddr_with_id = self.node1.get_multiaddr_with_id()
self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}")
self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri) self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri)
if self.node2.is_nwaku(): self.add_node_peer(self.node2, [self.multiaddr_with_id])
self.node2.add_peers([self.multiaddr_with_id])
self.main_nodes.extend([self.node1, self.node2]) self.main_nodes.extend([self.node1, self.node2])
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
@ -64,8 +64,7 @@ class StepsRelay:
self.node2.start( self.node2.start(
relay="true", discv5_bootstrap_node=self.enr_uri, rln_creds_source=RLN_CREDENTIALS, rln_creds_id="2", rln_relay_membership_index="1" relay="true", discv5_bootstrap_node=self.enr_uri, rln_creds_source=RLN_CREDENTIALS, rln_creds_id="2", rln_relay_membership_index="1"
) )
if self.node2.is_nwaku(): self.add_node_peer(self.node2, [self.multiaddr_with_id])
self.node2.add_peers([self.multiaddr_with_id])
self.main_nodes.extend([self.node1, self.node2]) self.main_nodes.extend([self.node1, self.node2])
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
@ -78,8 +77,7 @@ class StepsRelay:
for index, node in enumerate(nodes): for index, node in enumerate(nodes):
node = WakuNode(node, f"node{index + 3}_{request.cls.test_id}") node = WakuNode(node, f"node{index + 3}_{request.cls.test_id}")
node.start(relay="true", discv5_bootstrap_node=self.enr_uri) node.start(relay="true", discv5_bootstrap_node=self.enr_uri)
if node.is_nwaku(): self.add_node_peer(node, [self.multiaddr_with_id])
node.add_peers([self.multiaddr_with_id])
self.optional_nodes.append(node) self.optional_nodes.append(node)
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
@ -153,12 +151,6 @@ class StepsRelay:
for node in node_list: for node in node_list:
node.delete_relay_subscriptions(pubsub_topic_list) node.delete_relay_subscriptions(pubsub_topic_list)
@allure.step
def create_message(self, **kwargs):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
message.update(kwargs)
return message
@allure.step @allure.step
@retry(stop=stop_after_delay(120), wait=wait_fixed(1), reraise=True) @retry(stop=stop_after_delay(120), wait=wait_fixed(1), reraise=True)
def subscribe_and_publish_with_retry(self, node_list, pubsub_topic_list): def subscribe_and_publish_with_retry(self, node_list, pubsub_topic_list):

View File

@ -14,11 +14,12 @@ from src.env_vars import (
NODEKEY, NODEKEY,
) )
from src.node.waku_node import WakuNode from src.node.waku_node import WakuNode
from src.steps.common import StepsCommon
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
class StepsSharding: class StepsSharding(StepsCommon):
test_content_topic = "/myapp/1/latest/proto" test_content_topic = "/myapp/1/latest/proto"
test_pubsub_topic = "/waku/2/rs/2/0" test_pubsub_topic = "/waku/2/rs/2/0"
test_payload = "Sharding works!!" test_payload = "Sharding works!!"
@ -46,8 +47,7 @@ class StepsSharding:
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs) kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs) self.node2.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
if self.node2.is_nwaku(): self.add_node_peer(self.node2, [self.multiaddr_with_id])
self.node2.add_peers([self.multiaddr_with_id])
self.main_relay_nodes.extend([self.node2]) self.main_relay_nodes.extend([self.node2])
@allure.step @allure.step
@ -55,8 +55,7 @@ class StepsSharding:
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs) kwargs = self._resolve_sharding_flags(cluster_id, pubsub_topic, content_topic, **kwargs)
self.node2.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id, **kwargs) self.node2.start(relay="false", discv5_bootstrap_node=self.enr_uri, filternode=self.multiaddr_with_id, **kwargs)
if self.node2.is_nwaku(): self.add_node_peer(self.node2, [self.multiaddr_with_id])
self.node2.add_peers([self.multiaddr_with_id])
self.main_filter_nodes.extend([self.node2]) self.main_filter_nodes.extend([self.node2])
@allure.step @allure.step
@ -74,8 +73,7 @@ class StepsSharding:
for index, node in enumerate(nodes): for index, node in enumerate(nodes):
node = WakuNode(node, f"node{index + 3}_{self.test_id}") node = WakuNode(node, f"node{index + 3}_{self.test_id}")
node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs) node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
if node.is_nwaku(): self.add_node_peer(node, [self.multiaddr_with_id])
node.add_peers([self.multiaddr_with_id])
self.optional_relay_nodes.append(node) self.optional_relay_nodes.append(node)
@allure.step @allure.step
@ -84,7 +82,7 @@ class StepsSharding:
for index in range(num_nodes): for index in range(num_nodes):
node = WakuNode(DEFAULT_NWAKU, f"node{index + 3}_{self.test_id}") node = WakuNode(DEFAULT_NWAKU, f"node{index + 3}_{self.test_id}")
node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs) node.start(relay="true", discv5_bootstrap_node=self.enr_uri, **kwargs)
node.add_peers([self.multiaddr_with_id]) self.add_node_peer(node, [self.multiaddr_with_id])
self.optional_relay_nodes.append(node) self.optional_relay_nodes.append(node)
@allure.step @allure.step
@ -225,12 +223,6 @@ class StepsSharding:
except Exception as ex: except Exception as ex:
assert f"Failed to publish: Node not subscribed to topic: {pubsub_topic}" in str(ex) assert f"Failed to publish: Node not subscribed to topic: {pubsub_topic}" in str(ex)
@allure.step
def create_message(self, **kwargs):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
message.update(kwargs)
return message
@allure.step @allure.step
def _resolve_sharding_flags(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs): def _resolve_sharding_flags(self, cluster_id=None, pubsub_topic=None, content_topic=None, **kwargs):
if pubsub_topic: if pubsub_topic:

View File

@ -1,24 +1,22 @@
import inspect import inspect
import os
from src.libs.custom_logger import get_custom_logger from src.libs.custom_logger import get_custom_logger
from time import time from time import time
import pytest import pytest
import allure import allure
from src.libs.common import to_base64, delay, gen_step_id from src.libs.common import to_base64, delay
from src.node.waku_message import WakuMessage from src.node.waku_message import WakuMessage
from src.env_vars import ( from src.env_vars import (
ADDITIONAL_NODES, ADDITIONAL_NODES,
NODE_1, NODE_1,
NODE_2, NODE_2,
) )
from src.node.waku_node import WakuNode, rln_credential_store_ready from src.node.waku_node import WakuNode
from tenacity import retry, stop_after_delay, wait_fixed from src.steps.common import StepsCommon
from src.test_data import VALID_PUBSUB_TOPICS
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
class StepsStore: class StepsStore(StepsCommon):
test_content_topic = "/myapp/1/latest/proto" test_content_topic = "/myapp/1/latest/proto"
test_pubsub_topic = "/waku/2/rs/0/0" test_pubsub_topic = "/waku/2/rs/0/0"
test_payload = "Store works!!" test_payload = "Store works!!"
@ -31,12 +29,6 @@ class StepsStore:
self.optional_nodes = [] self.optional_nodes = []
self.multiaddr_list = [] self.multiaddr_list = []
@allure.step
def add_node_peer(self, node):
if node.is_nwaku():
for multiaddr in self.multiaddr_list:
node.add_peers([multiaddr])
@allure.step @allure.step
def start_publishing_node(self, image, node_index, **kwargs): def start_publishing_node(self, image, node_index, **kwargs):
node = WakuNode(image, f"publishing_node{node_index}_{self.test_id}") node = WakuNode(image, f"publishing_node{node_index}_{self.test_id}")
@ -45,7 +37,7 @@ class StepsStore:
self.main_publishing_nodes.extend([node]) self.main_publishing_nodes.extend([node])
if kwargs["store"] == "true": if kwargs["store"] == "true":
self.store_nodes.extend([node]) self.store_nodes.extend([node])
self.add_node_peer(node) self.add_node_peer(node, self.multiaddr_list)
self.multiaddr_list.extend([node.get_multiaddr_with_id()]) self.multiaddr_list.extend([node.get_multiaddr_with_id()])
return node return node
@ -56,7 +48,7 @@ class StepsStore:
if kwargs["relay"] == "true": if kwargs["relay"] == "true":
self.main_publishing_nodes.extend([node]) self.main_publishing_nodes.extend([node])
self.store_nodes.extend([node]) self.store_nodes.extend([node])
self.add_node_peer(node) self.add_node_peer(node, self.multiaddr_list)
return node return node
@allure.step @allure.step
@ -187,12 +179,6 @@ class StepsStore:
except Exception as ex: except Exception as ex:
assert "couldn't find any messages" in str(ex) assert "couldn't find any messages" in str(ex)
@allure.step
def create_message(self, **kwargs):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
message.update(kwargs)
return message
@allure.step @allure.step
def create_payload(self, pubsub_topic=None, message=None, **kwargs): def create_payload(self, pubsub_topic=None, message=None, **kwargs):
if message is None: if message is None: