status-go/tests-functional/tests/test_cases.py

267 lines
9.9 KiB
Python

from contextlib import contextmanager
import json
import logging
import threading
import time
from collections import namedtuple
import pytest
from clients.services.wallet import WalletService
from clients.signals import SignalClient, SignalType
from clients.status_backend import RpcClient, StatusBackend
from conftest import option
from resources.constants import user_1, user_2
from resources.enums import MessageContentType
class StatusDTestCase:
network_id = 31337
def setup_method(self):
self.rpc_client = RpcClient(option.rpc_url_statusd)
class StatusBackendTestCase:
await_signals = [SignalType.NODE_LOGIN.value]
network_id = 31337
def setup_class(self):
self.rpc_client = StatusBackend(await_signals=self.await_signals)
self.wallet_service = WalletService(self.rpc_client)
self.rpc_client.init_status_backend()
self.rpc_client.restore_account_and_login()
self.rpc_client.wait_for_login()
class WalletTestCase(StatusBackendTestCase):
def wallet_create_multi_transaction(self, **kwargs):
method = "wallet_createMultiTransaction"
transfer_tx_data = {
"data": "",
"from": user_1.address,
"gas": "0x5BBF",
"input": "",
"maxFeePerGas": "0xbcc0f04fd",
"maxPriorityFeePerGas": "0xbcc0f04fd",
"to": user_2.address,
"type": "0x02",
"value": "0x5af3107a4000",
}
for key, new_value in kwargs.items():
if key in transfer_tx_data:
transfer_tx_data[key] = new_value
else:
logging.info(f"Warning: The key '{key}' does not exist in the transferTx parameters and will be ignored.")
params = [
{
"fromAddress": user_1.address,
"fromAmount": "0x5af3107a4000",
"fromAsset": "ETH",
"type": 0, # MultiTransactionSend
"toAddress": user_2.address,
"toAsset": "ETH",
},
[
{
"bridgeName": "Transfer",
"chainID": 31337,
"transferTx": transfer_tx_data,
}
],
f"{option.password}",
]
return self.rpc_client.rpc_request(method, params)
def send_valid_multi_transaction(self, **kwargs):
response = self.wallet_create_multi_transaction(**kwargs)
tx_hash = None
self.rpc_client.verify_is_valid_json_rpc_response(response)
try:
tx_hash = response.json()["result"]["hashes"][str(self.network_id)][0]
except (KeyError, json.JSONDecodeError):
raise Exception(response.content)
return tx_hash
class TransactionTestCase(WalletTestCase):
def setup_method(self):
self.tx_hash = self.send_valid_multi_transaction()
class EthRpcTestCase(WalletTestCase):
@pytest.fixture(autouse=True, scope="class")
def tx_data(self):
tx_hash = self.send_valid_multi_transaction()
self.wait_until_tx_not_pending(tx_hash)
receipt = self.get_transaction_receipt(tx_hash)
try:
block_number = receipt.json()["result"]["blockNumber"]
block_hash = receipt.json()["result"]["blockHash"]
except (KeyError, json.JSONDecodeError):
raise Exception(receipt.content)
tx_data = namedtuple("TxData", ["tx_hash", "block_number", "block_hash"])
return tx_data(tx_hash, block_number, block_hash)
def get_block_header(self, block_number):
method = "ethclient_headerByNumber"
params = [self.network_id, block_number]
return self.rpc_client.rpc_valid_request(method, params)
def get_transaction_receipt(self, tx_hash):
method = "ethclient_transactionReceipt"
params = [self.network_id, tx_hash]
return self.rpc_client.rpc_valid_request(method, params)
def wait_until_tx_not_pending(self, tx_hash, timeout=10):
method = "ethclient_transactionByHash"
params = [self.network_id, tx_hash]
response = self.rpc_client.rpc_valid_request(method, params)
start_time = time.time()
while response.json()["result"]["isPending"] is True:
time_passed = time.time() - start_time
if time_passed >= timeout:
raise TimeoutError(f"Tx {tx_hash} is still pending after {timeout} seconds")
time.sleep(0.5)
response = self.rpc_client.rpc_valid_request(method, params)
return response.json()["result"]["tx"]
class SignalTestCase(StatusDTestCase):
await_signals = []
def setup_method(self):
super().setup_method()
self.signal_client = SignalClient(option.ws_url_statusd, self.await_signals)
websocket_thread = threading.Thread(target=self.signal_client._connect)
websocket_thread.daemon = True
websocket_thread.start()
class NetworkConditionTestCase:
@contextmanager
def add_latency(self, node, latency=300, jitter=50):
logging.info("Entering context manager: add_latency")
node.container_exec(f"apk add iproute2 && tc qdisc add dev eth0 root netem delay {latency}ms {jitter}ms distribution normal")
try:
yield
finally:
logging.info("Exiting context manager: add_latency")
node.container_exec("tc qdisc del dev eth0 root")
@contextmanager
def add_packet_loss(self, node, packet_loss=2):
logging.info("Entering context manager: add_packet_loss")
node.container_exec(f"apk add iproute2 && tc qdisc add dev eth0 root netem loss {packet_loss}%")
try:
yield
finally:
logging.info("Exiting context manager: add_packet_loss")
node.container_exec("tc qdisc del dev eth0 root netem")
@contextmanager
def add_low_bandwith(self, node, rate="1mbit", burst="32kbit"):
logging.info("Entering context manager: add_low_bandwith")
node.container_exec(f"apk add iproute2 && tc qdisc add dev eth0 root tbf rate {rate} burst {burst}")
try:
yield
finally:
logging.info("Exiting context manager: add_low_bandwith")
node.container_exec("tc qdisc del dev eth0 root")
@contextmanager
def node_pause(self, node):
logging.info("Entering context manager: node_pause")
node.container_pause()
try:
yield
finally:
logging.info("Exiting context manager: node_pause")
node.container_unpause()
class MessengerTestCase(NetworkConditionTestCase):
await_signals = [
SignalType.MESSAGES_NEW.value,
SignalType.MESSAGE_DELIVERED.value,
SignalType.NODE_LOGIN.value,
]
@pytest.fixture(scope="class", autouse=False)
def setup_two_nodes(self, request):
request.cls.sender = self.sender = self.initialize_backend(await_signals=self.await_signals)
request.cls.receiver = self.receiver = self.initialize_backend(await_signals=self.await_signals)
def initialize_backend(self, await_signals):
backend = StatusBackend(await_signals=await_signals)
backend.init_status_backend()
backend.create_account_and_login()
backend.find_public_key()
backend.wakuext_service.start_messenger()
return backend
def make_contacts(self):
existing_contacts = self.receiver.wakuext_service.get_contacts()
if self.sender.public_key in str(existing_contacts):
return
response = self.sender.wakuext_service.send_contact_request(self.receiver.public_key, "contact_request")
expected_message = self.get_message_by_content_type(response, content_type=MessageContentType.CONTACT_REQUEST.value)[0]
self.receiver.find_signal_containing_pattern(SignalType.MESSAGES_NEW.value, event_pattern=expected_message.get("id"))
self.receiver.wakuext_service.accept_contact_request(expected_message.get("id"))
accepted_signal = f"@{self.receiver.public_key} accepted your contact request"
self.sender.find_signal_containing_pattern(SignalType.MESSAGES_NEW.value, event_pattern=accepted_signal)
def validate_signal_event_against_response(self, signal_event, fields_to_validate, expected_message):
expected_message_id = expected_message.get("id")
signal_event_messages = signal_event.get("event", {}).get("messages")
assert len(signal_event_messages) > 0, "No messages found in the signal event"
message = next(
(message for message in signal_event_messages if message.get("id") == expected_message_id),
None,
)
assert message, f"Message with ID {expected_message_id} not found in the signal event"
message_mismatch = []
for response_field, event_field in fields_to_validate.items():
response_value = expected_message[response_field]
event_value = message[event_field]
if response_value != event_value:
message_mismatch.append(f"Field '{response_field}': Expected '{response_value}', Found '{event_value}'")
if not message_mismatch:
return
raise AssertionError(
"Some Sender RPC responses are not matching the signals received by the receiver.\n"
"Details of mismatches:\n" + "\n".join(message_mismatch)
)
def get_message_by_content_type(self, response, content_type, message_pattern=""):
matched_messages = []
messages = response.get("result", {}).get("messages", [])
for message in messages:
if message.get("contentType") != content_type:
continue
if not message_pattern or message_pattern in str(message):
matched_messages.append(message)
if matched_messages:
return matched_messages
else:
raise ValueError(f"Failed to find a message with contentType '{content_type}' in response")