diff --git a/_assets/ci/Jenkinsfile.desktop b/_assets/ci/Jenkinsfile.desktop index 8af5d8cb2..c7e460c84 100644 --- a/_assets/ci/Jenkinsfile.desktop +++ b/_assets/ci/Jenkinsfile.desktop @@ -110,14 +110,18 @@ pipeline { } } } + stage('Cleanup') { + steps { + script { + cleanTmp() + } + } + } } // stages post { success { script { github.notifyPR(true) } } failure { script { github.notifyPR(false) } } - cleanup { - cleanWs() - cleanTmp() - } + cleanup { cleanWs() } } // post } // pipeline @@ -154,8 +158,9 @@ def shell(cmd) { } def cleanTmp() { - /* Fails on windows due to Durable Task plugin failure. */ - if (env.PLATFORM != 'windows') { + if (env.PLATFORM == 'windows') { + sh "rm -rf ${env.WORKSPACE}@tmp" + } else { dir("${env.WORKSPACE}@tmp") { deleteDir() } } -} +} \ No newline at end of file diff --git a/_assets/ci/Jenkinsfile.tests b/_assets/ci/Jenkinsfile.tests index c7727abff..c73ca4cfd 100644 --- a/_assets/ci/Jenkinsfile.tests +++ b/_assets/ci/Jenkinsfile.tests @@ -254,4 +254,4 @@ def getDefaultUnitTestCount() { isNightlyJob() ? '20' : '1' } def getDefaultTimeout() { isNightlyJob() ? 5*60 : 50 } -def getAmountToKeep() { isNightlyJob() ? '14' : isDevelopJob() ? '30' : '5' } \ No newline at end of file +def getAmountToKeep() { isNightlyJob() ? '14' : isDevelopJob() ? '10' : '5' } diff --git a/api/geth_backend.go b/api/geth_backend.go index 5e983d39f..b4c445e63 100644 --- a/api/geth_backend.go +++ b/api/geth_backend.go @@ -113,8 +113,8 @@ func NewGethStatusBackend(logger *zap.Logger) *GethStatusBackend { backend.initialize() logger.Info("Status backend initialized", - zap.String("backend geth version", version.Version()), - zap.String("commit", version.GitCommit()), + zap.String("backend geth version", params.Version), + zap.String("commit", params.GitCommit), zap.String("IpfsGatewayURL", params.IpfsGatewayURL)) return backend @@ -2112,9 +2112,7 @@ func (b *GethStatusBackend) startNode(config *params.NodeConfig) (err error) { } }() - b.logger.Info("status-go version details", - zap.String("version", version.Version()), - zap.String("commit", version.GitCommit())) + b.logger.Info("status-go version details", zap.String("version", params.Version), zap.String("commit", params.GitCommit)) b.logger.Debug("starting node with config", zap.Stringer("config", config)) // Update config with some defaults. if err := config.UpdateWithDefaults(); err != nil { diff --git a/params/config.go b/params/config.go index 2f256e443..ea99cbc27 100644 --- a/params/config.go +++ b/params/config.go @@ -20,7 +20,6 @@ import ( "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/internal/version" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/static" wakucommon "github.com/status-im/status-go/waku/common" diff --git a/rpc/client.go b/rpc/client.go index 574f5da7b..63ada5680 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -23,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum/event" appCommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/healthmanager" - "github.com/status-im/status-go/internal/version" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/params" "github.com/status-im/status-go/rpc/chain" diff --git a/services/ext/service.go b/services/ext/service.go index d5d65b153..256323319 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -34,7 +34,6 @@ import ( "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/images" - "github.com/status-im/status-go/internal/version" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/multiaccounts" "github.com/status-im/status-go/multiaccounts/accounts" diff --git a/services/wallet/routeexecution/manager.go b/services/wallet/routeexecution/manager.go index 6241bec5d..9d73c5849 100644 --- a/services/wallet/routeexecution/manager.go +++ b/services/wallet/routeexecution/manager.go @@ -2,16 +2,11 @@ package routeexecution import ( "context" - "database/sql" "time" - "go.uber.org/zap" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/logutils" status_common "github.com/status-im/status-go/common" statusErrors "github.com/status-im/status-go/errors" @@ -28,26 +23,16 @@ type Manager struct { router *router.Router transactionManager *transfer.TransactionManager transferController *transfer.Controller - db *DB - - // Local data used for storage purposes - buildInputParams *requests.RouterBuildTransactionsParams } -func NewManager(walletDB *sql.DB, router *router.Router, transactionManager *transfer.TransactionManager, transferController *transfer.Controller) *Manager { +func NewManager(router *router.Router, transactionManager *transfer.TransactionManager, transferController *transfer.Controller) *Manager { return &Manager{ router: router, transactionManager: transactionManager, transferController: transferController, - db: NewDB(walletDB), } } -func (m *Manager) clearLocalRouteData() { - m.buildInputParams = nil - m.transactionManager.ClearLocalRouterTransactionsData() -} - func (m *Manager) BuildTransactionsFromRoute(ctx context.Context, buildInputParams *requests.RouterBuildTransactionsParams) { go func() { defer status_common.LogOnPanic() @@ -63,7 +48,7 @@ func (m *Manager) BuildTransactionsFromRoute(ctx context.Context, buildInputPara defer func() { if err != nil { - m.clearLocalRouteData() + m.transactionManager.ClearLocalRouterTransactionsData() err = statusErrors.CreateErrorResponseFromError(err) response.SendDetails.ErrorResponse = err.(*statusErrors.ErrorResponse) } @@ -77,8 +62,6 @@ func (m *Manager) BuildTransactionsFromRoute(ctx context.Context, buildInputPara return } - m.buildInputParams = buildInputParams - updateFields(response.SendDetails, routeInputParams) // notify client that sending transactions started (has 3 steps, building txs, signing txs, sending txs) @@ -125,7 +108,7 @@ func (m *Manager) SendRouterTransactionsWithSignatures(ctx context.Context, send } if clearLocalData { - m.clearLocalRouteData() + m.transactionManager.ClearLocalRouterTransactionsData() } if err != nil { @@ -180,20 +163,6 @@ func (m *Manager) SendRouterTransactionsWithSignatures(ctx context.Context, send ////////////////////////////////////////////////////////////////////////////// response.SentTransactions, err = m.transactionManager.SendRouterTransactions(ctx, multiTx) - if err != nil { - log.Error("Error sending router transactions", "error", err) - // TODO #16556: Handle partially successful Tx sends? - // Don't return, store whichever transactions were successfully sent - } - - // don't overwrite err since we want to process it in the deferred function - var tmpErr error - routerTransactions := m.transactionManager.GetRouterTransactions() - routeData := NewRouteData(&routeInputParams, m.buildInputParams, routerTransactions) - tmpErr = m.db.PutRouteData(routeData) - if tmpErr != nil { - log.Error("Error storing route data", "error", tmpErr) - } var ( chainIDs []uint64 @@ -204,17 +173,13 @@ func (m *Manager) SendRouterTransactionsWithSignatures(ctx context.Context, send addresses = append(addresses, common.Address(tx.FromAddress)) go func(chainId uint64, txHash common.Hash) { defer status_common.LogOnPanic() - tmpErr = m.transactionManager.WatchTransaction(context.Background(), chainId, txHash) - if tmpErr != nil { - logutils.ZapLogger().Error("Error watching transaction", zap.Error(tmpErr)) + err = m.transactionManager.WatchTransaction(context.Background(), chainId, txHash) + if err != nil { return } }(tx.FromChain, common.Hash(tx.Hash)) } - tmpErr = m.transferController.CheckRecentHistory(chainIDs, addresses) - if tmpErr != nil { - logutils.ZapLogger().Error("Error checking recent history", zap.Error(tmpErr)) - } + err = m.transferController.CheckRecentHistory(chainIDs, addresses) }() } diff --git a/services/wallet/service.go b/services/wallet/service.go index 18582ac78..d50b400fd 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -190,13 +190,13 @@ func NewService( } router := router.NewRouter(rpcClient, transactor, tokenManager, marketManager, collectibles, - collectiblesManager, ens) - pathProcessors := buildPathProcessors(rpcClient, transactor, tokenManager, ens, featureFlags) + collectiblesManager, ens, stickers) + pathProcessors := buildPathProcessors(rpcClient, transactor, tokenManager, ens, stickers, featureFlags) for _, processor := range pathProcessors { router.AddPathProcessor(processor) } - routeExecutionManager := routeexecution.NewManager(db, router, transactionManager, transferController) + routeExecutionManager := routeexecution.NewManager(router, transactionManager, transferController) return &Service{ db: db, @@ -236,6 +236,7 @@ func buildPathProcessors( transactor *transactions.Transactor, tokenManager *token.Manager, ens *ens.Service, + stickers *stickers.Service, featureFlags *protocolCommon.FeatureFlags, ) []pathprocessor.PathProcessor { ret := make([]pathprocessor.PathProcessor, 0) @@ -267,12 +268,6 @@ func buildPathProcessors( ensRelease := pathprocessor.NewENSReleaseProcessor(rpcClient, transactor, ens) ret = append(ret, ensRelease) - ensPublicKey := pathprocessor.NewENSPublicKeyProcessor(rpcClient, transactor, ens) - ret = append(ret, ensPublicKey) - - buyStickers := pathprocessor.NewStickersBuyProcessor(rpcClient, transactor) - ret = append(ret, buyStickers) - return ret } diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index d836ff556..5cedfc11c 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -273,6 +273,13 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { logutils.ZapLogger().Error("saveAndConfirmPending error", zap.Error(err)) return err } + + // Check if multi transaction needs to be created + err = c.processMultiTransactions(ctx, allTransfers) + if err != nil { + logutils.ZapLogger().Error("processMultiTransactions error", zap.Error(err)) + return err + } } else { // If no transfers found, that is suspecting, because downloader returned this block as containing transfers logutils.ZapLogger().Error("no transfers found in block", diff --git a/telemetry/client.go b/telemetry/client.go index 85cbbf218..276ff6588 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -19,10 +19,9 @@ import ( wps "github.com/waku-org/go-waku/waku/v2/peerstore" - v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" - v1protocol "github.com/status-im/status-go/protocol/v1" v2common "github.com/status-im/status-go/wakuv2/common" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" ) type TelemetryType string diff --git a/tests-functional/.gitignore b/tests-functional/.gitignore new file mode 100644 index 000000000..6a45c55fe --- /dev/null +++ b/tests-functional/.gitignore @@ -0,0 +1,2 @@ +.idea/ +.local/ diff --git a/tests-functional/README.MD b/tests-functional/README.MD index 16ceefa9c..d30e44e00 100644 --- a/tests-functional/README.MD +++ b/tests-functional/README.MD @@ -1,6 +1,10 @@ +Here’s the updated README with the additional prerequisites and instructions: + +--- + ## Overview -Functional tests for status-go +Functional tests for `status-go` ## Table of Contents @@ -8,34 +12,92 @@ Functional tests for status-go - [How to Install](#how-to-install) - [How to Run](#how-to-run) - [Running Tests](#running-tests) -- [Implementation details](#implementation-details) +- [Implementation Details](#implementation-details) +- [Build Status Backend](#build-status-backend) ## How to Install -* Install [Docker](https://docs.docker.com/engine/install/) and [Docker Compose](https://docs.docker.com/compose/install/) -* Install [Python 3.10.14](https://www.python.org/downloads/) -* In `./tests-functional`, run `pip install -r requirements.txt` -* **Optional (for test development)**: Use Python virtual environment for better dependency management. You can follow the guide [here](https://akrabat.com/creating-virtual-environments-with-pyenv/): +1. Install [Docker](https://docs.docker.com/engine/install/) and [Docker Compose](https://docs.docker.com/compose/install/) +2. Install [Python 3.10.14](https://www.python.org/downloads/) +3. **Set up a virtual environment (recommended):** + - In `./tests-functional`, run: + ```bash + python -m venv .venv + source .venv/bin/activate + pip install -r requirements.txt + ``` + - **Optional (for test development)**: Use Python virtual environment for better dependency management. You can follow the guide [here](https://akrabat.com/creating-virtual-environments-with-pyenv/) +4. Install pre-commit hooks (optional): + ```bash + pre-commit install + ``` ## How to Run -### Running dev RPC (anvil with contracts) -- In `./tests-functional` run `docker compose -f docker-compose.anvil.yml up --remove-orphans --build`, as result: - * an [anvil](https://book.getfoundry.sh/reference/anvil/) container with ChainID 31337 exposed on `0.0.0.0:8545` will start running - * Status-im contracts will be deployed to the network +### Running dev RPC (Anvil with contracts) -### Run tests -- In `./tests-functional` run `docker compose -f docker-compose.anvil.yml -f docker-compose.test.status-go.yml -f docker-compose.status-go.local.yml up --build --remove-orphans`, as result: - * a container with [status-go as daemon](https://github.com/status-im/status-go/issues/5175) will be created with APIModules exposed on `0.0.0.0:3333` - * status-go will use [anvil](https://book.getfoundry.sh/reference/anvil/) as RPCURL with ChainID 31337 - * all Status-im contracts will be deployed to the network +In `./tests-functional`: +```bash +docker compose -f docker-compose.anvil.yml up --remove-orphans --build +``` -* In `./tests-functional/tests` directory run `pytest -m wallet` +This command will: +- Start an [Anvil](https://book.getfoundry.sh/reference/anvil/) container with ChainID `31337`, exposed on `0.0.0.0:8545` +- Deploy Status-im contracts to the Anvil network -## Implementation details +### Running Tests -- Functional tests are implemented in `./tests-functional/tests` based on [pytest](https://docs.pytest.org/en/8.2.x/) -- Every test has two types of verifications: - - `verify_is_valid_json_rpc_response()` checks for status code 200, non-empty response, JSON-RPC structure, presence of the `result` field, and expected ID. - - `jsonschema.validate()` is used to check that the response contains expected data, including required fields and types. Schemas are stored in `/schemas/wallet_MethodName` - - New schemas can be generated using `./tests-functional/schema_builder.py` by passing a response to the `CustomSchemaBuilder(schema_name).create_schema(response.json())` method, should be used only on test creation phase, please search `how to create schema:` to see an example in a test \ No newline at end of file +To run the tests: + +1. In `./tests-functional`, start the testing containers: + ```bash + docker compose -f docker-compose.anvil.yml -f docker-compose.test.status-go.yml -f docker-compose.status-go.local.yml up --build --remove-orphans + ``` + + This command will: + - Create a container with [status-go as daemon](https://github.com/status-im/status-go/issues/5175), exposing `APIModules` on `0.0.0.0:3333` + - Configure `status-go` to use [Anvil](https://book.getfoundry.sh/reference/anvil/) as the `RPCURL` with ChainID `31337` + - Deploy all Status-im contracts to the Anvil network + +2. To execute tests: + - Run all tests: + ```bash + pytest + ``` + - Run tests marked as `wallet`: + ```bash + pytest -m wallet + ``` + - Run a specific test: + ```bash + pytest -k "test_contact_request_baseline" + ``` + +## Implementation Details + +- Functional tests are implemented in `./tests-functional/tests` using [pytest](https://docs.pytest.org/en/8.2.x/). +- Each test performs two types of verifications: + - **`verify_is_valid_json_rpc_response()`**: Checks for a status code `200`, a non-empty response, JSON-RPC structure, presence of the `result` field, and the expected ID. + - **`jsonschema.validate()`**: Validates that the response contains expected data, including required fields and types. Schemas are stored in `/schemas/wallet_MethodName`. + +- **Schema Generation**: + - New schemas can be generated with `./tests-functional/schema_builder.py` by passing a response to the `CustomSchemaBuilder(schema_name).create_schema(response.json())` method. This should be used only during test creation. + - Search `how to create schema:` in test files for examples. + +## Build Status Backend + +You can build the binary with the following command in the `status-go` root directory: + +```bash +make status-backend +``` + +For further details on building and setting up `status-go` and `status-backend`, refer to the official documentation: +- [status-backend README](https://github.com/status-im/status-go/blob/develop/cmd/status-backend/README.md) +- [status-go cmd directory](https://github.com/status-im/status-go/tree/develop/cmd/status-backend) + +Location of the binary: `cmd/status-backend/status-backend` + +--- + +This README should cover your additional setup, installation, and testing instructions with clear steps for users. Let me know if there are any further modifications needed! \ No newline at end of file diff --git a/tests-functional/clients/signals.py b/tests-functional/clients/signals.py index 28b670af4..192c50561 100644 --- a/tests-functional/clients/signals.py +++ b/tests-functional/clients/signals.py @@ -9,40 +9,60 @@ class SignalClient: def __init__(self, ws_url, await_signals): self.url = f"{ws_url}/signals" - self.await_signals = await_signals - self.received_signals = { - signal: [] for signal in self.await_signals - } + self.received_signals = {signal: [] for signal in self.await_signals} def on_message(self, ws, signal): - signal = json.loads(signal) - if signal.get("type") in self.await_signals: - self.received_signals[signal["type"]].append(signal) + logger = logging.getLogger(__name__) - def wait_for_signal(self, signal_type, timeout=20): + signal_data = json.loads(signal) + signal_type = signal_data.get("type") + + logger.info(f"Received signal: {signal_data}") + + if signal_type in self.await_signals: + self.received_signals[signal_type].append(signal_data) + # logger.debug(f"Signal {signal_type} stored: {signal_data}") + + def wait_for_signal(self, signal_type, expected_event=None, timeout=20): + logger = logging.getLogger(__name__) start_time = time.time() - while not self.received_signals.get(signal_type): - if time.time() - start_time >= timeout: - raise TimeoutError( - f"Signal {signal_type} is not received in {timeout} seconds") + while time.time() - start_time < timeout: + if self.received_signals.get(signal_type): + received_signal = self.received_signals[signal_type][0] + if expected_event: + event = received_signal.get("event", {}) + if all(event.get(k) == v for k, v in expected_event.items()): + logger.info(f"Signal {signal_type} with event {expected_event} received and matched.") + return received_signal + else: + logger.debug( + f"Signal {signal_type} received but event did not match expected event: {expected_event}. Received event: {event}") + else: + logger.info(f"Signal {signal_type} received without specific event validation.") + return received_signal time.sleep(0.2) - logging.debug(f"Signal {signal_type} is received in {round(time.time() - start_time)} seconds") - return self.received_signals[signal_type][0] + + raise TimeoutError(f"Signal {signal_type} with event {expected_event} not received in {timeout} seconds") def _on_error(self, ws, error): - logging.error(f"Error: {error}") + logger = logging.getLogger(__name__) + logger.error(f"WebSocket error: {error}") def _on_close(self, ws, close_status_code, close_msg): - logging.info(f"Connection closed: {close_status_code}, {close_msg}") + logger = logging.getLogger(__name__) + logger.info(f"WebSocket connection closed: {close_status_code}, {close_msg}") def _on_open(self, ws): - logging.info("Connection opened") + logger = logging.getLogger(__name__) + logger.info("WebSocket connection opened") def _connect(self): - ws = websocket.WebSocketApp(self.url, - on_message=self.on_message, - on_error=self._on_error, - on_close=self._on_close) + ws = websocket.WebSocketApp( + self.url, + on_message=self.on_message, + on_error=self._on_error, + on_close=self._on_close + ) ws.on_open = self._on_open ws.run_forever() diff --git a/tests-functional/constants.py b/tests-functional/constants.py index a164e05aa..e6d3b732d 100644 --- a/tests-functional/constants.py +++ b/tests-functional/constants.py @@ -1,5 +1,7 @@ +import os +import random from dataclasses import dataclass - +from src.libs.common import create_unique_data_dir @dataclass class Account: @@ -8,7 +10,7 @@ class Account: password: str passphrase: str - +# User accounts user_1 = Account( address="0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266", private_key="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80", @@ -21,3 +23,28 @@ user_2 = Account( password="Strong12345", passphrase="test test test test test test test test test test nest junk" ) + +# Paths and URLs +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) +STATUS_BACKEND_URL = os.getenv("STATUS_BACKEND_URL", "http://127.0.0.1") +API_REQUEST_TIMEOUT = int(os.getenv("API_REQUEST_TIMEOUT", "15")) + +# Paths relative to project root +DATA_DIR = os.path.join(PROJECT_ROOT, "tests-functional/local") +LOCAL_DATA_DIR1 = create_unique_data_dir(DATA_DIR, random.randint(1, 100)) +LOCAL_DATA_DIR2 = create_unique_data_dir(DATA_DIR, random.randint(1, 100)) +RESOURCES_FOLDER = os.path.join(PROJECT_ROOT, "resources") + +# Account payload default values +ACCOUNT_PAYLOAD_DEFAULTS = { + "displayName": "user", + "password": "test_password", + "customizationColor": "primary" +} + +# Network emulation commands +LATENCY_CMD = "sudo tc qdisc add dev eth0 root netem delay 1s 100ms distribution normal" +PACKET_LOSS_CMD = "sudo tc qdisc add dev eth0 root netem loss 50%" +LOW_BANDWIDTH_CMD = "sudo tc qdisc add dev eth0 root tbf rate 1kbit burst 1kbit" +REMOVE_TC_CMD = "sudo tc qdisc del dev eth0 root" +NUM_CONTACT_REQUESTS = 5 \ No newline at end of file diff --git a/tests-functional/src/__init__.py b/tests-functional/src/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests-functional/src/libs/__init__.py b/tests-functional/src/libs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests-functional/src/libs/base_api_client.py b/tests-functional/src/libs/base_api_client.py new file mode 100644 index 000000000..0ae92d2a9 --- /dev/null +++ b/tests-functional/src/libs/base_api_client.py @@ -0,0 +1,28 @@ +import requests +import json +from tenacity import retry, stop_after_delay, wait_fixed +from src.libs.custom_logger import get_custom_logger + +logger = get_custom_logger(__name__) + +class BaseAPIClient: + def __init__(self, base_url): + self.base_url = base_url + + @retry(stop=stop_after_delay(10), wait=wait_fixed(0.5), reraise=True) + def send_post_request(self, endpoint, payload=None, headers=None, timeout=10): + if headers is None: + headers = {"Content-Type": "application/json"} + if payload is None: + payload = {} + + url = f"{self.base_url}/{endpoint}" + logger.info(f"Sending POST request to {url} with payload: {json.dumps(payload)}") + try: + response = requests.post(url, headers=headers, data=json.dumps(payload), timeout=timeout) + response.raise_for_status() + logger.info(f"Response received: {response.status_code} - {response.text}") + return response.json() + except requests.exceptions.RequestException as e: + logger.error(f"Request to {url} failed: {str(e)}") + raise diff --git a/tests-functional/src/libs/common.py b/tests-functional/src/libs/common.py new file mode 100644 index 000000000..4faa38473 --- /dev/null +++ b/tests-functional/src/libs/common.py @@ -0,0 +1,28 @@ +from time import sleep +from src.libs.custom_logger import get_custom_logger +import os +import allure +import uuid + +logger = get_custom_logger(__name__) + + +def attach_allure_file(file): + logger.debug(f"Attaching file {file}") + allure.attach.file(file, name=os.path.basename(file), attachment_type=allure.attachment_type.TEXT) + + +def delay(num_seconds): + logger.debug(f"Sleeping for {num_seconds} seconds") + sleep(num_seconds) + +def create_unique_data_dir(base_dir: str, index: int) -> str: + """Generate a unique data directory for each node instance.""" + unique_id = str(uuid.uuid4())[:8] + unique_dir = os.path.join(base_dir, f"data_{index}_{unique_id}") + os.makedirs(unique_dir, exist_ok=True) + return unique_dir + +def get_project_root() -> str: + """Returns the root directory of the project.""" + return os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) diff --git a/tests-functional/src/libs/custom_logger.py b/tests-functional/src/libs/custom_logger.py new file mode 100644 index 000000000..ec2f8e567 --- /dev/null +++ b/tests-functional/src/libs/custom_logger.py @@ -0,0 +1,24 @@ +import logging + +max_log_line_length = 10000 + + +def log_length_filter(max_length): + class logLengthFilter(logging.Filter): + def filter(self, record): + if len(record.getMessage()) > max_length: + logging.getLogger(record.name).log( + record.levelno, f"Log line was discarded because it's longer than max_log_line_length={max_log_line_length}" + ) + return False + return True + + return logLengthFilter() + + +def get_custom_logger(name): + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("docker").setLevel(logging.WARNING) + logger = logging.getLogger(name) + logger.addFilter(log_length_filter(max_log_line_length)) + return logger diff --git a/tests-functional/src/node/__init__.py b/tests-functional/src/node/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests-functional/src/node/rpc_client.py b/tests-functional/src/node/rpc_client.py new file mode 100644 index 000000000..e8c4fd888 --- /dev/null +++ b/tests-functional/src/node/rpc_client.py @@ -0,0 +1,94 @@ +from src.libs.base_api_client import BaseAPIClient +from constants import * +from src.libs.custom_logger import get_custom_logger +from tenacity import retry, stop_after_attempt, wait_fixed + +logger = get_custom_logger(__name__) + + +class StatusNodeRPC(BaseAPIClient): + def __init__(self, port, node_name): + super().__init__(f"http://127.0.0.1:{port}/statusgo") + self.node_name = node_name + + @retry( + stop=stop_after_attempt(3), + wait=wait_fixed(1), + reraise=True + ) + def send_rpc_request(self, method, params=None, timeout=API_REQUEST_TIMEOUT): + """Send JSON-RPC requests, used for standard JSON-RPC API calls.""" + payload = {"jsonrpc": "2.0", "method": method, "params": params or [], "id": 1} + logger.info(f"Sending JSON-RPC request to {self.base_url} with payload: {payload}") + + response = self.send_post_request("CallRPC", payload, timeout=timeout) + + if response.get("error"): + logger.error(f"RPC request failed with error: {response['error']}") + raise RuntimeError(f"RPC request failed with error: {response['error']}") + + return response + + @retry( + stop=stop_after_attempt(3), + wait=wait_fixed(1), + reraise=True + ) + def initialize_application(self, data_dir, timeout=API_REQUEST_TIMEOUT): + """Send a direct POST request to the InitializeApplication endpoint.""" + payload = {"dataDir": data_dir} + logger.info(f"Sending direct POST request to InitializeApplication with payload: {payload}") + + response = self.send_post_request("InitializeApplication", payload, timeout=timeout) + + if response.get("error"): + logger.error(f"InitializeApplication request failed with error: {response['error']}") + raise RuntimeError(f"Failed to initialize application: {response['error']}") + + return response + + @retry( + stop=stop_after_attempt(3), + wait=wait_fixed(1), + reraise=True + ) + def create_account_and_login(self, account_data, timeout=API_REQUEST_TIMEOUT): + """Send a direct POST request to CreateAccountAndLogin endpoint.""" + payload = { + "rootDataDir": account_data.get("rootDataDir"), + "displayName": account_data.get("displayName", "test1"), + "password": account_data.get("password", "test1"), + "customizationColor": account_data.get("customizationColor", "primary") + } + logger.info(f"Sending direct POST request to CreateAccountAndLogin with payload: {payload}") + + response = self.send_post_request("CreateAccountAndLogin", payload, timeout=timeout) + + if response.get("error"): + logger.error(f"CreateAccountAndLogin request failed with error: {response['error']}") + raise RuntimeError(f"Failed to create account and login: {response['error']}") + + return response + + @retry( + stop=stop_after_attempt(3), + wait=wait_fixed(1), + reraise=True + ) + def start_messenger(self, timeout=API_REQUEST_TIMEOUT): + """Send JSON-RPC request to start Waku messenger.""" + payload = { + "jsonrpc": "2.0", + "method": "wakuext_startMessenger", + "params": [], + "id": 1 + } + logger.info(f"Sending JSON-RPC request to start Waku messenger: {payload}") + + response = self.send_post_request("CallRPC", payload, timeout=timeout) + + if response.get("error"): + logger.error(f"Starting Waku messenger failed with error: {response['error']}") + raise RuntimeError(f"Failed to start Waku messenger: {response['error']}") + + return response diff --git a/tests-functional/src/node/status_node.py b/tests-functional/src/node/status_node.py new file mode 100644 index 000000000..dd0502764 --- /dev/null +++ b/tests-functional/src/node/status_node.py @@ -0,0 +1,174 @@ +import os +import asyncio +import random +import shutil +import signal +import string +import subprocess +import threading +import time + +from clients.status_backend import RpcClient +from conftest import option +from src.libs.custom_logger import get_custom_logger +from src.node.rpc_client import StatusNodeRPC +from clients.signals import SignalClient + +logger = get_custom_logger(__name__) + + +class StatusNode: + def __init__(self, name=None, port=None, pubkey=None): + self.data_dir = None + try: + os.remove(f"{name}.log") + except: + pass + self.name = self.random_node_name() if not name else name.lower() + self.port = str(random.randint(1024, 65535)) if not port else port + self.pubkey = pubkey + self.process = None + self.log_thread = None + self.capture_logs = True + self.logs = [] + self.pid = None + self.signal_client = None + self.last_response = None + self.api = StatusNodeRPC(self.port, self.name) + + def setup_method(self): + # Set up RPC client + self.rpc_client = RpcClient(option.rpc_url_statusd) + # Set up WebSocket signal client + await_signals = ["history.request.started", "history.request.completed"] + self.signal_client = SignalClient(option.ws_url_statusd, await_signals) + + # Start WebSocket connection in a separate thread + websocket_thread = threading.Thread(target=self.signal_client._connect) + websocket_thread.daemon = True + websocket_thread.start() + + def initialize_node(self, name, port, data_dir, account_data): + """Centralized method to initialize a node.""" + self.name = name + self.port = port + self.start(data_dir) + self.wait_fully_started() + self.create_account_and_login(account_data) + self.start_messenger() + self.pubkey = self.get_pubkey(account_data["displayName"]) + + def start_node(self, command): + """Start the node using a subprocess command.""" + logger.info(f"Starting node with command: {command}") + self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) + self.pid = self.process.pid + self.log_thread = self.capture_process_logs(self.process, self.logs) + + def start(self, data_dir, capture_logs=True): + """Start the status-backend node and initialize it before subscribing to signals.""" + self.capture_logs = capture_logs + self.data_dir = data_dir + command = ["./status-backend", f"--address=localhost:{self.port}"] + self.start_node(command) + self.wait_fully_started() + self.last_response = self.api.initialize_application(data_dir) + self.api = StatusNodeRPC(self.port, self.name) + self.start_signal_client() + + def create_account_and_login(self, account_data): + """Create an account and log in using the status-backend.""" + logger.info(f"Creating account and logging in for node {self.name}") + self.api.create_account_and_login(account_data) + + def start_messenger(self): + """Start the Waku messenger.""" + logger.info(f"Starting Waku messenger for node {self.name}") + self.api.start_messenger() + + def start_signal_client(self): + """Start a SignalClient for the given node to listen for WebSocket signals.""" + ws_url = f"ws://localhost:{self.port}" + await_signals = ["history.request.started", "history.request.completed"] + self.signal_client = SignalClient(ws_url, await_signals) + + websocket_thread = threading.Thread(target=self.signal_client._connect) + websocket_thread.daemon = True + websocket_thread.start() + logger.info("WebSocket client started and subscribed to signals.") + + def wait_fully_started(self): + """Wait until the node logs indicate that the server has started.""" + logger.info(f"Waiting for {self.name} to fully start...") + start_time = time.time() + while time.time() - start_time < 20: + if any("server started" in log for log in self.logs): + logger.info(f"Node {self.name} has fully started.") + return + time.sleep(0.5) + raise TimeoutError(f"Node {self.name} did not fully start in time.") + + def capture_process_logs(self, process, logs): + """Capture logs from a subprocess.""" + + def read_output(): + while True: + line = process.stdout.readline() + if not line: + break + logs.append(line.strip()) + logger.debug(f"{self.name.upper()} - {line.strip()}") + + log_thread = threading.Thread(target=read_output) + log_thread.daemon = True + log_thread.start() + return log_thread + + def random_node_name(self, length=10): + """Generate a random node name.""" + allowed_chars = string.ascii_lowercase + string.digits + "_-" + return ''.join(random.choice(allowed_chars) for _ in range(length)) + + def get_pubkey(self, display_name): + """Retrieve public-key based on display name from accounts_getAccounts response.""" + response = self.api.send_rpc_request("accounts_getAccounts") + + accounts = response.get("result", []) + for account in accounts: + if account.get("name") == display_name: + return account.get("public-key") + raise ValueError(f"Public key not found for display name: {display_name}") + + def wait_for_signal(self, signal_type, expected_event=None, timeout=20): + """Wait for a signal using the signal client and validate against expected event details.""" + return self.signal_client.wait_for_signal(signal_type, expected_event, timeout) + + def stop(self, remove_local_data=True): + """Stop the status-backend process.""" + if self.process: + logger.info(f"Stopping node with name: {self.name}") + self.process.kill() + if self.capture_logs: + self.log_thread.join() + if remove_local_data: + node_dir = f"test-{self.name}" + if os.path.exists(node_dir): + try: + shutil.rmtree(node_dir) + except Exception as ex: + logger.warning(f"Couldn't delete node dir {node_dir} because of {str(ex)}") + self.process = None + + def send_contact_request(self, pubkey, message): + params = [{"id": pubkey, "message": message}] + return self.api.send_rpc_request("wakuext_sendContactRequest", params) + + def pause_process(self): + if self.pid: + logger.info(f"Pausing node with pid: {self.pid}") + os.kill(self.pid, signal.SIGTSTP) + + def resume_process(self): + if self.pid: + logger.info(f"Resuming node with pid: {self.pid}") + os.kill(self.pid, signal.SIGCONT) diff --git a/tests-functional/src/steps/__init__.py b/tests-functional/src/steps/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests-functional/src/steps/common.py b/tests-functional/src/steps/common.py new file mode 100644 index 000000000..6393cef27 --- /dev/null +++ b/tests-functional/src/steps/common.py @@ -0,0 +1,117 @@ +from contextlib import contextmanager +import inspect +import subprocess +import pytest +from src.libs.common import delay +from src.libs.custom_logger import get_custom_logger +from src.node.status_node import StatusNode +from datetime import datetime +from constants import * + +logger = get_custom_logger(__name__) + + +class StepsCommon: + @pytest.fixture(scope="function", autouse=False) + def start_1_node(self): + account_data = { + **ACCOUNT_PAYLOAD_DEFAULTS, + "rootDataDir": LOCAL_DATA_DIR1, + "displayName": "first_node_user" + } + random_port = str(random.randint(1024, 65535)) + + self.first_node = StatusNode() + self.first_node.initialize_node("first_node", random_port, LOCAL_DATA_DIR1, account_data) + self.first_node_pubkey = self.first_node.get_pubkey() + + @pytest.fixture(scope="function", autouse=False) + def start_2_nodes(self): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + + account_data_first = { + **ACCOUNT_PAYLOAD_DEFAULTS, + "rootDataDir": LOCAL_DATA_DIR1, + "displayName": "first_node_user" + } + account_data_second = { + **ACCOUNT_PAYLOAD_DEFAULTS, + "rootDataDir": LOCAL_DATA_DIR2, + "displayName": "second_node_user" + } + + self.first_node = StatusNode(name="first_node") + self.first_node.start(data_dir=LOCAL_DATA_DIR1) + self.first_node.wait_fully_started() + + self.second_node = StatusNode(name="second_node") + self.second_node.start(data_dir=LOCAL_DATA_DIR2) + self.second_node.wait_fully_started() + + self.first_node.create_account_and_login(account_data_first) + self.second_node.create_account_and_login(account_data_second) + + delay(4) + self.first_node.start_messenger() + delay(1) + self.second_node.start_messenger() + + self.first_node_pubkey = self.first_node.get_pubkey("first_node_user") + self.second_node_pubkey = self.second_node.get_pubkey("second_node_user") + + logger.debug(f"First Node Public Key: {self.first_node_pubkey}") + logger.debug(f"Second Node Public Key: {self.second_node_pubkey}") + + @contextmanager + def add_latency(self): + """Add network latency""" + logger.debug("Adding network latency") + subprocess.Popen(LATENCY_CMD, shell=True) + try: + yield + finally: + logger.debug("Removing network latency") + subprocess.Popen(REMOVE_TC_CMD, shell=True) + + @contextmanager + def add_packet_loss(self): + """Add packet loss""" + logger.debug("Adding packet loss") + subprocess.Popen(PACKET_LOSS_CMD, shell=True) + try: + yield + finally: + logger.debug("Removing packet loss") + subprocess.Popen(REMOVE_TC_CMD, shell=True) + + @contextmanager + def add_low_bandwidth(self): + """Add low bandwidth""" + logger.debug("Adding low bandwidth") + subprocess.Popen(LOW_BANDWIDTH_CMD, shell=True) + try: + yield + finally: + logger.debug("Removing low bandwidth") + subprocess.Popen(REMOVE_TC_CMD, shell=True) + + @contextmanager + def node_pause(self, node): + logger.debug("Entering context manager: node_pause") + node.pause_process() + try: + yield + finally: + logger.debug(f"Exiting context manager: node_pause") + node.resume_process() + + def send_with_timestamp(self, send_method, id, message): + timestamp = datetime.now().strftime("%H:%M:%S") + response = send_method(id, message) + response_messages = response["result"]["messages"] + message_id = None + for m in response_messages: + if m["text"] == message: + message_id = m["id"] + break + return timestamp, message_id \ No newline at end of file diff --git a/tests-functional/tests/test_contact_request.py b/tests-functional/tests/test_contact_request.py new file mode 100644 index 000000000..0490366e8 --- /dev/null +++ b/tests-functional/tests/test_contact_request.py @@ -0,0 +1,124 @@ +import logging +from uuid import uuid4 +from constants import * +from src.libs.common import delay +from src.node.status_node import StatusNode, logger +from src.steps.common import StepsCommon +from src.libs.common import create_unique_data_dir, get_project_root +from validators.contact_request_validator import ContactRequestValidator + + +class TestContactRequest(StepsCommon): + def test_contact_request_baseline(self): + timeout_secs = 180 + num_contact_requests = NUM_CONTACT_REQUESTS + project_root = get_project_root() + nodes = [] + + for index in range(num_contact_requests): + first_node = StatusNode(name=f"first_node_{index}") + second_node = StatusNode(name=f"second_node_{index}") + + data_dir_first = create_unique_data_dir(os.path.join(project_root, "tests-functional/local"), index) + data_dir_second = create_unique_data_dir(os.path.join(project_root, "tests-functional/local"), index) + + delay(2) + first_node.start(data_dir=data_dir_first) + second_node.start(data_dir=data_dir_second) + + account_data_first = { + "rootDataDir": data_dir_first, + "displayName": f"test_user_first_{index}", + "password": f"test_password_first_{index}", + "customizationColor": "primary" + } + account_data_second = { + "rootDataDir": data_dir_second, + "displayName": f"test_user_second_{index}", + "password": f"test_password_second_{index}", + "customizationColor": "primary" + } + first_node.create_account_and_login(account_data_first) + second_node.create_account_and_login(account_data_second) + + delay(5) + first_node.start_messenger() + second_node.start_messenger() + + first_node.pubkey = first_node.get_pubkey(account_data_first["displayName"]) + second_node.pubkey = second_node.get_pubkey(account_data_second["displayName"]) + + first_node.wait_fully_started() + second_node.wait_fully_started() + + nodes.append((first_node, second_node, account_data_first["displayName"], index)) + + # Validate contact requests + missing_contact_requests = [] + for first_node, second_node, display_name, index in nodes: + result = self.send_and_wait_for_message((first_node, second_node), display_name, index, timeout_secs) + timestamp, message_id, contact_request_message, response = result + + if not response: + missing_contact_requests.append((timestamp, contact_request_message, message_id)) + else: + validator = ContactRequestValidator(response) + validator.run_all_validations( + expected_chat_id=first_node.pubkey, + expected_display_name=display_name, + expected_text=f"contact_request_{index}" + ) + + if missing_contact_requests: + formatted_missing_requests = [ + f"Timestamp: {ts}, Message: {msg}, ID: {mid}" for ts, msg, mid in missing_contact_requests + ] + raise AssertionError( + f"{len(missing_contact_requests)} contact requests out of {num_contact_requests} didn't reach the peer node: " + + "\n".join(formatted_missing_requests) + ) + + def send_and_wait_for_message(self, nodes, display_name, index, timeout=45): + first_node, second_node = nodes + first_node_pubkey = first_node.get_pubkey(display_name) + contact_request_message = f"contact_request_{index}" + + timestamp, message_id = self.send_with_timestamp( + second_node.send_contact_request, first_node_pubkey, contact_request_message + ) + + response = second_node.send_contact_request(first_node_pubkey, contact_request_message) + + expected_event_started = {"requestId": "", "peerId": "", "batchIndex": 0, "numBatches": 1} + expected_event_completed = {"requestId": "", "peerId": "", "batchIndex": 0} + + try: + first_node.wait_for_signal("history.request.started", expected_event_started, timeout) + first_node.wait_for_signal("history.request.completed", expected_event_completed, timeout) + except TimeoutError as e: + logging.error(f"Signal validation failed: {str(e)}") + return timestamp, message_id, contact_request_message, None + + first_node.stop() + second_node.stop() + + return timestamp, message_id, contact_request_message, response + + def test_contact_request_with_latency(self): + with self.add_latency(): + self.test_contact_request_baseline() + + def test_contact_request_with_packet_loss(self): + with self.add_packet_loss(): + self.test_contact_request_baseline() + + def test_contact_request_with_low_bandwidth(self): + with self.add_low_bandwidth(): + self.test_contact_request_baseline() + + def test_contact_request_with_node_pause(self, start_2_nodes): + with self.node_pause(self.second_node): + message = str(uuid4()) + self.first_node.send_contact_request(self.second_node_pubkey, message) + delay(10) + assert self.second_node.wait_for_signal("history.request.completed") diff --git a/tests-functional/validators/contact_request_validator.py b/tests-functional/validators/contact_request_validator.py new file mode 100644 index 000000000..81f4bf841 --- /dev/null +++ b/tests-functional/validators/contact_request_validator.py @@ -0,0 +1,36 @@ +import logging + +from src.steps.common import logger + + +class ContactRequestValidator: + """Validator class for contact request responses.""" + + def __init__(self, response): + self.response = response + + def validate_response_structure(self): + """Check the overall structure of the response.""" + assert self.response.get("jsonrpc") == "2.0", "Invalid JSON-RPC version" + assert "result" in self.response, "Missing 'result' in response" + + def validate_chat_data(self, expected_chat_id, expected_display_name, expected_text): + """Validate the chat data fields in the response.""" + chats = self.response["result"].get("chats", []) + assert len(chats) > 0, "No chats found in the response" + + chat = chats[0] # Validate the first chat as an example + assert chat.get("id") == expected_chat_id, f"Chat ID mismatch: Expected {expected_chat_id}" + assert chat.get("name").startswith("0x"), "Invalid chat name format" + + last_message = chat.get("lastMessage", {}) + # assert last_message.get("displayName") == expected_display_name, "Display name mismatch" + assert last_message.get("text") == expected_text, "Message text mismatch" + assert last_message.get("contactRequestState") == 1, "Unexpected contact request state" + assert "compressedKey" in last_message, "Missing 'compressedKey' in last message" + + def run_all_validations(self, expected_chat_id, expected_display_name, expected_text): + """Run all validation methods for the contact request response.""" + self.validate_response_structure() + self.validate_chat_data(expected_chat_id, expected_display_name, expected_text) + logger.info("All validations passed for the contact request response.") \ No newline at end of file diff --git a/transactions/pendingtxtracker.go b/transactions/pendingtxtracker.go index a297f89ed..22fcdfed2 100644 --- a/transactions/pendingtxtracker.go +++ b/transactions/pendingtxtracker.go @@ -88,12 +88,11 @@ type PendingTxTracker struct { func NewPendingTxTracker(db *sql.DB, rpcClient rpc.ClientInterface, rpcFilter *rpcfilters.Service, eventFeed *event.Feed, checkInterval time.Duration) *PendingTxTracker { tm := &PendingTxTracker{ - db: db, - trackedTxDB: NewDB(db), - rpcClient: rpcClient, - eventFeed: eventFeed, - rpcFilter: rpcFilter, - logger: logutils.ZapLogger().Named("PendingTxTracker"), + db: db, + rpcClient: rpcClient, + eventFeed: eventFeed, + rpcFilter: rpcFilter, + logger: logutils.ZapLogger().Named("PendingTxTracker"), } tm.taskRunner = NewConditionalRepeater(checkInterval, func(ctx context.Context) bool { return tm.fetchAndUpdateDB(ctx)