mirror of
https://github.com/status-im/status-go.git
synced 2025-02-04 10:56:00 +00:00
119 lines
5.0 KiB
Python
119 lines
5.0 KiB
Python
import logging
|
|
from uuid import uuid4
|
|
from constants import *
|
|
from src.libs.common import delay
|
|
from src.node.status_node import StatusNode, logger
|
|
from src.steps.common import StepsCommon
|
|
from src.libs.common import create_unique_data_dir, get_project_root
|
|
from validators.contact_request_validator import ContactRequestValidator
|
|
|
|
|
|
class TestContactRequest(StepsCommon):
|
|
def test_contact_request_baseline(self):
|
|
timeout_secs = 10
|
|
num_contact_requests = NUM_CONTACT_REQUESTS
|
|
project_root = get_project_root()
|
|
nodes = []
|
|
LOCAL_DATA = "tests-functional/local"
|
|
|
|
for index in range(num_contact_requests):
|
|
first_node = StatusNode(name=f"first_node_{index}")
|
|
second_node = StatusNode(name=f"second_node_{index}")
|
|
|
|
data_dir_first = create_unique_data_dir(os.path.join(project_root, LOCAL_DATA), index)
|
|
data_dir_second = create_unique_data_dir(os.path.join(project_root, LOCAL_DATA), index)
|
|
|
|
delay(2)
|
|
first_node.start(data_dir=data_dir_first)
|
|
second_node.start(data_dir=data_dir_second)
|
|
|
|
account_data_first = {
|
|
"rootDataDir": data_dir_first,
|
|
"displayName": f"test_user_first_{index}",
|
|
"password": f"test_password_first_{index}",
|
|
"customizationColor": "primary"
|
|
}
|
|
account_data_second = {
|
|
"rootDataDir": data_dir_second,
|
|
"displayName": f"test_user_second_{index}",
|
|
"password": f"test_password_second_{index}",
|
|
"customizationColor": "primary"
|
|
}
|
|
first_node.create_account_and_login(account_data_first)
|
|
second_node.create_account_and_login(account_data_second)
|
|
|
|
delay(5)
|
|
first_node.start_messenger()
|
|
second_node.start_messenger()
|
|
|
|
first_node.pubkey = first_node.get_pubkey(account_data_first["displayName"])
|
|
second_node.pubkey = second_node.get_pubkey(account_data_second["displayName"])
|
|
|
|
first_node.wait_fully_started()
|
|
second_node.wait_fully_started()
|
|
|
|
nodes.append((first_node, second_node, account_data_first["displayName"], index))
|
|
|
|
missing_contact_requests = []
|
|
for first_node, second_node, display_name, index in nodes:
|
|
result = self.send_and_wait_for_message((first_node, second_node), display_name, index, timeout_secs)
|
|
timestamp, message_id, contact_request_message, response = result
|
|
|
|
if not response:
|
|
missing_contact_requests.append((timestamp, contact_request_message, message_id))
|
|
else:
|
|
validator = ContactRequestValidator(response)
|
|
validator.run_all_validations(
|
|
expected_chat_id=first_node.pubkey,
|
|
expected_display_name=display_name,
|
|
expected_text=f"contact_request_{index}"
|
|
)
|
|
|
|
if missing_contact_requests:
|
|
formatted_missing_requests = [
|
|
f"Timestamp: {ts}, Message: {msg}, ID: {mid}" for ts, msg, mid in missing_contact_requests
|
|
]
|
|
raise AssertionError(
|
|
f"{len(missing_contact_requests)} contact requests out of {num_contact_requests} didn't reach the peer node: "
|
|
+ "\n".join(formatted_missing_requests)
|
|
)
|
|
|
|
def send_and_wait_for_message(self, nodes, display_name, index, timeout=10):
|
|
first_node, second_node = nodes
|
|
first_node_pubkey = first_node.get_pubkey(display_name)
|
|
contact_request_message = f"contact_request_{index}"
|
|
|
|
timestamp, message_id, response = self.send_with_timestamp(
|
|
second_node.send_contact_request, first_node_pubkey, contact_request_message
|
|
)
|
|
try:
|
|
first_node.wait_for_signal("history.request.started", None, timeout)
|
|
first_node.wait_for_signal("history.request.completed", None, timeout)
|
|
except TimeoutError as e:
|
|
logging.error(f"Signal validation failed: {str(e)}")
|
|
return timestamp, message_id, contact_request_message, None
|
|
|
|
first_node.stop()
|
|
second_node.stop()
|
|
|
|
return timestamp, message_id, contact_request_message, response
|
|
|
|
def test_contact_request_with_latency(self):
|
|
with self.add_latency():
|
|
self.test_contact_request_baseline()
|
|
|
|
def test_contact_request_with_packet_loss(self):
|
|
with self.add_packet_loss():
|
|
self.test_contact_request_baseline()
|
|
|
|
def test_contact_request_with_low_bandwidth(self):
|
|
with self.add_low_bandwidth():
|
|
self.test_contact_request_baseline()
|
|
|
|
def test_contact_request_with_node_pause(self, start_2_nodes):
|
|
with self.node_pause(self.second_node):
|
|
message = str(uuid4())
|
|
self.first_node.send_contact_request(self.second_node_pubkey, message)
|
|
delay(10)
|
|
assert self.second_node.wait_for_signal("history.request.completed")
|