status-go/tests-functional/tests/test_contact_request.py

134 lines
5.8 KiB
Python

from uuid import uuid4
from src.constants import *
from src.libs.common import delay
from src.libs.custom_logger import get_custom_logger
from src.node.status_node import StatusNode
from src.steps.common import StepsCommon
from src.validators.message_validator import MessageValidator
logger = get_custom_logger(__name__)
class TestContactRequest(StepsCommon):
def test_contact_request_baseline(self):
timeout_secs = EVENT_SIGNAL_TIMEOUT_SEC
num_contact_requests = 1
nodes = []
for index in range(num_contact_requests):
first_node = StatusNode(name=f"first_node_{index}")
second_node = StatusNode(name=f"second_node_{index}")
data_dir_first = create_unique_data_dir(os.path.join(PROJECT_ROOT, DATA_DIR), index)
data_dir_second = create_unique_data_dir(os.path.join(PROJECT_ROOT, DATA_DIR), index)
delay(2)
first_node.start(data_dir=data_dir_first)
second_node.start(data_dir=data_dir_second)
account_data_first = {
"rootDataDir": data_dir_first,
"displayName": f"test_user_first_{index}",
"password": f"test_password_first_{index}",
"customizationColor": "primary",
}
account_data_second = {
"rootDataDir": data_dir_second,
"displayName": f"test_user_second_{index}",
"password": f"test_password_second_{index}",
"customizationColor": "primary",
}
first_node.create_account_and_login(account_data_first)
second_node.create_account_and_login(account_data_second)
delay(5)
first_node.start_messenger()
second_node.start_messenger()
first_node.pubkey = first_node.get_pubkey(account_data_first["displayName"])
second_node.pubkey = second_node.get_pubkey(account_data_second["displayName"])
first_node.wait_fully_started()
second_node.wait_fully_started()
nodes.append((first_node, second_node, account_data_first["displayName"], account_data_second["displayName"], index))
missing_contact_requests = []
for first_node, second_node, first_node_display_name, second_node_display_name, index in nodes:
result = self.send_and_wait_for_message((first_node, second_node), first_node_display_name, second_node_display_name, index, timeout_secs)
timestamp, message_id, contact_request_message, response = result
if not response:
missing_contact_requests.append((timestamp, contact_request_message, message_id))
if missing_contact_requests:
formatted_missing_requests = [f"Timestamp: {ts}, Message: {msg}, ID: {mid}" for ts, msg, mid in missing_contact_requests]
raise AssertionError(
f"{len(missing_contact_requests)} contact requests out of {num_contact_requests} didn't reach the peer node: "
+ "\n".join(formatted_missing_requests)
)
def send_and_wait_for_message(self, nodes, first_node_display_name, second_node_display_name, index, timeout=10):
first_node, second_node = nodes
first_node_pubkey = first_node.get_pubkey(first_node_display_name)
contact_request_message = f"contact_request_{index}"
timestamp, message_id, response = self.send_with_timestamp(second_node.send_contact_request, first_node_pubkey, contact_request_message)
validator = MessageValidator(response)
validator.run_all_validations(first_node_pubkey, second_node_display_name, contact_request_message)
try:
messages_new_events = first_node.wait_for_complete_signal("messages.new", timeout)
messages_new_event = None
for event in messages_new_events:
if "chats" in event.get("event", {}):
messages_new_event = event
try:
validator.validate_event_against_response(
messages_new_event,
fields_to_validate={
"text": "text",
"displayName": "displayName",
"id": "id",
},
)
break
except AssertionError as validation_error:
logger.error(f"Validation failed for event: {messages_new_event}, Error: {validation_error}")
continue
if messages_new_event is None:
raise ValueError("No 'messages.new' event with 'chats' data found within the timeout period.")
except (TimeoutError, ValueError) as e:
logger.error(f"Signal validation failed: {str(e)}")
return timestamp, message_id, contact_request_message, None
first_node.stop()
second_node.stop()
return timestamp, message_id, contact_request_message, response
def test_contact_request_with_latency(self):
with self.add_latency():
self.test_contact_request_baseline()
def test_contact_request_with_packet_loss(self):
with self.add_packet_loss():
self.test_contact_request_baseline()
def test_contact_request_with_low_bandwidth(self):
with self.add_low_bandwidth():
self.test_contact_request_baseline()
def test_contact_request_with_node_pause(self, start_2_nodes):
with self.node_pause(self.second_node):
message = str(uuid4())
self.first_node.send_contact_request(self.second_node_pubkey, message)
delay(30)
assert self.second_node.wait_for_signal("messages.new")
assert self.first_node.wait_for_signal("message.delivered")