status-go/tests-functional/clients/status_backend.py
Andrey Bocharnikov 17adba689e
chore(config)_: integrate new rpc providers configs (#6178)
* chore(config)_: Integration of new RPC Provider configurations

* default_networks.go
  * explicit provider initialization with more granular config (rps limiter, order)
  * token overrides made more flexible, support not only infura and grove
* get_status_node.go
  * override status-proxy auth instead of passing override config to rpc/client.go
* config.go
  * ProviderConfig removed
* client.go
  * Now any provider can be enabled/disabled (if user wants to use only his custom RPC urls)
  * Use bearer auth instead of URL auth
  * Provider order is defined by default_networks.go

* Do not store embedded RPC provider credentials in the DB

* FIXME:  TestLoginAndMigrationsStillWorkWithExistingDesktopUser uses deprecated test data (with non-existing related_chain_id in networks table)
2025-01-31 18:12:07 +04:00

315 lines
11 KiB
Python

import io
import json
import logging
import string
import tarfile
import tempfile
import time
import random
import threading
import requests
import docker
import docker.errors
import os
from clients.services.wallet import WalletService
from clients.services.wakuext import WakuextService
from clients.services.accounts import AccountService
from clients.services.settings import SettingsService
from clients.signals import SignalClient, SignalType
from clients.rpc import RpcClient
from conftest import option
from resources.constants import user_1, DEFAULT_DISPLAY_NAME, USER_DIR
from docker.errors import APIError
NANOSECONDS_PER_SECOND = 1_000_000_000
class StatusBackend(RpcClient, SignalClient):
container = None
def __init__(self, await_signals=[], privileged=False):
if option.status_backend_url:
url = option.status_backend_url
else:
self.docker_client = docker.from_env()
host_port = random.choice(option.status_backend_port_range)
self.container = self._start_container(host_port, privileged)
url = f"http://127.0.0.1:{host_port}"
option.status_backend_port_range.remove(host_port)
self.base_url = url
self.api_url = f"{url}/statusgo"
self.ws_url = f"{url}".replace("http", "ws")
self.rpc_url = f"{url}/statusgo/CallRPC"
self.public_key = ""
RpcClient.__init__(self, self.rpc_url)
SignalClient.__init__(self, self.ws_url, await_signals)
self.wait_for_healthy()
websocket_thread = threading.Thread(target=self._connect)
websocket_thread.daemon = True
websocket_thread.start()
self.wallet_service = WalletService(self)
self.wakuext_service = WakuextService(self)
self.accounts_service = AccountService(self)
self.settings_service = SettingsService(self)
def _start_container(self, host_port, privileged):
docker_project_name = option.docker_project_name
identifier = os.environ.get("BUILD_ID") if os.environ.get("CI") else os.popen("git rev-parse --short HEAD").read().strip()
image_name = f"{docker_project_name}-status-backend:latest"
container_name = f"{docker_project_name}-{identifier}-status-backend-{host_port}"
coverage_path = option.codecov_dir if option.codecov_dir else os.path.abspath("./coverage/binary")
container_args = {
"image": image_name,
"detach": True,
"privileged": privileged,
"name": container_name,
"labels": {"com.docker.compose.project": docker_project_name},
"entrypoint": [
"status-backend",
"--address",
"0.0.0.0:3333",
],
"ports": {"3333/tcp": host_port},
"environment": {
"GOCOVERDIR": "/coverage/binary",
},
"volumes": {
coverage_path: {
"bind": "/coverage/binary",
"mode": "rw",
}
},
}
if "FUNCTIONAL_TESTS_DOCKER_UID" in os.environ:
container_args["user"] = os.environ["FUNCTIONAL_TESTS_DOCKER_UID"]
container = self.docker_client.containers.run(**container_args)
network = self.docker_client.networks.get(f"{docker_project_name}_default")
network.connect(container)
option.status_backend_containers.append(container.id)
return container
def wait_for_healthy(self, timeout=10):
start_time = time.time()
while time.time() - start_time <= timeout:
try:
self.health(enable_logging=False)
logging.info(f"StatusBackend is healthy after {time.time() - start_time} seconds")
return
except Exception:
time.sleep(0.1)
raise TimeoutError(f"StatusBackend was not healthy after {timeout} seconds")
def health(self, enable_logging=True):
return self.api_request("health", data=[], url=self.base_url, enable_logging=enable_logging)
def api_request(self, method, data, url=None, enable_logging=True):
url = url if url else self.api_url
url = f"{url}/{method}"
if enable_logging:
logging.info(f"Sending POST request to url {url} with data: {json.dumps(data, sort_keys=True, indent=4)}")
response = requests.post(url, json=data)
if enable_logging:
logging.info(f"Got response: {response.content}")
return response
def verify_is_valid_api_response(self, response):
assert response.status_code == 200, f"Got response {response.content}, status code {response.status_code}"
assert response.content
logging.info(f"Got response: {response.content}")
try:
error = response.json()["error"]
assert not error, f"Error: {error}"
except json.JSONDecodeError:
raise AssertionError(f"Invalid JSON in response: {response.content}")
except KeyError:
pass
def api_valid_request(self, method, data, url=None):
response = self.api_request(method, data, url)
self.verify_is_valid_api_response(response)
return response
def init_status_backend(self, data_dir=USER_DIR):
method = "InitializeApplication"
data = {
"dataDir": data_dir,
"logEnabled": True,
"logLevel": "DEBUG",
"apiLogging": True,
}
return self.api_valid_request(method, data)
def _set_proxy_credentials(self, data):
if "STATUS_BUILD_PROXY_USER" not in os.environ:
return data
user = os.environ["STATUS_BUILD_PROXY_USER"]
password = os.environ["STATUS_BUILD_PROXY_PASSWORD"]
data["StatusProxyMarketUser"] = user
data["StatusProxyMarketPassword"] = password
data["StatusProxyBlockchainUser"] = user
data["StatusProxyBlockchainPassword"] = password
data["StatusProxyEnabled"] = True
data["StatusProxyStageName"] = "test"
return data
def extract_data(self, path: str):
if not self.container:
return path
try:
stream, _ = self.container.get_archive(path)
except docker.errors.NotFound:
return None
temp_dir = tempfile.mkdtemp()
tar_bytes = io.BytesIO(b"".join(stream))
with tarfile.open(fileobj=tar_bytes) as tar:
tar.extractall(path=temp_dir)
# If the tar contains a single file, return the path to that file
# Otherwise it's a directory, just return temp_dir.
if len(tar.getmembers()) == 1:
return os.path.join(temp_dir, tar.getmembers()[0].name)
return temp_dir
def create_account_and_login(
self,
data_dir=USER_DIR,
display_name=None,
password=user_1.password,
):
self.display_name = (
display_name if display_name else f"DISP_NAME_{''.join(random.choices(string.ascii_letters + string.digits + '_-', k=10))}"
)
method = "CreateAccountAndLogin"
data = {
"rootDataDir": data_dir,
"kdfIterations": 256000,
"displayName": self.display_name,
"password": password,
"customizationColor": "primary",
"logEnabled": True,
"logLevel": "DEBUG",
}
data = self._set_proxy_credentials(data)
resp = self.api_valid_request(method, data)
self.node_login_event = self.find_signal_containing_pattern(SignalType.NODE_LOGIN.value, event_pattern=self.display_name)
return resp
def restore_account_and_login(
self,
data_dir=USER_DIR,
display_name=DEFAULT_DISPLAY_NAME,
user=user_1,
network_id=31337,
):
method = "RestoreAccountAndLogin"
data = {
"rootDataDir": data_dir,
"kdfIterations": 256000,
"displayName": display_name,
"password": user.password,
"mnemonic": user.passphrase,
"customizationColor": "blue",
"logEnabled": True,
"logLevel": "DEBUG",
"testNetworksEnabled": False,
"networkId": network_id,
"networksOverride": [
{
"ChainID": network_id,
"ChainName": "Anvil",
"RpcProviders": [
{
"chainId": network_id,
"name": "Anvil Direct",
"url": "http://anvil:8545",
"enableRpsLimiter": False,
"type": "embedded-direct",
"enabled": True,
"authType": "no-auth",
}
],
"ShortName": "eth",
"NativeCurrencyName": "Ether",
"NativeCurrencySymbol": "ETH",
"NativeCurrencyDecimals": 18,
"IsTest": False,
"Layer": 1,
"Enabled": True,
}
],
}
data = self._set_proxy_credentials(data)
return self.api_valid_request(method, data)
def login(self, keyUid, user=user_1):
method = "LoginAccount"
data = {
"password": user.password,
"keyUid": keyUid,
"kdfIterations": 256000,
}
data = self._set_proxy_credentials(data)
return self.api_valid_request(method, data)
def logout(self, user=user_1):
method = "Logout"
return self.api_valid_request(method, {})
def restore_account_and_wait_for_rpc_client_to_start(self, timeout=60):
self.restore_account_and_login()
start_time = time.time()
# ToDo: change this part for waiting for `node.login` signal when websockets are migrated to StatusBackend
while time.time() - start_time <= timeout:
try:
self.accounts_service.get_account_keypairs()
return
except AssertionError:
time.sleep(3)
raise TimeoutError(f"RPC client was not started after {timeout} seconds")
def container_pause(self):
if not self.container:
raise RuntimeError("Container is not initialized.")
self.container.pause()
logging.info(f"Container {self.container.name} paused.")
def container_unpause(self):
if not self.container:
raise RuntimeError("Container is not initialized.")
self.container.unpause()
logging.info(f"Container {self.container.name} unpaused.")
def container_exec(self, command):
if not self.container:
raise RuntimeError("Container is not initialized.")
try:
exec_result = self.container.exec_run(cmd=["sh", "-c", command], stdout=True, stderr=True, tty=False)
if exec_result.exit_code != 0:
raise RuntimeError(f"Failed to execute command in container {self.container.id}:\n" f"OUTPUT: {exec_result.output.decode().strip()}")
return exec_result.output.decode().strip()
except APIError as e:
raise RuntimeError(f"API error during container execution: {str(e)}") from e
def find_public_key(self):
self.public_key = self.node_login_event.get("event", {}).get("settings", {}).get("public-key")