121 lines
3.9 KiB
Python
121 lines
3.9 KiB
Python
import json
|
|
import logging
|
|
import jsonschema
|
|
import uuid
|
|
|
|
|
|
from conftest import option
|
|
from resources.constants import user_1, user_2
|
|
|
|
|
|
def verify_json_schema(response, method):
|
|
with open(f"{option.base_dir}/schemas/{method}", "r") as schema:
|
|
jsonschema.validate(instance=response, schema=json.load(schema))
|
|
|
|
|
|
def get_suggested_routes(rpc_client, **kwargs):
|
|
_uuid = str(uuid.uuid4())
|
|
amount_in = "0xde0b6b3a7640000"
|
|
|
|
method = "wallet_getSuggestedRoutesAsync"
|
|
input_params = {
|
|
"uuid": _uuid,
|
|
"sendType": 0,
|
|
"addrFrom": user_1.address,
|
|
"addrTo": user_2.address,
|
|
"amountIn": amount_in,
|
|
"amountOut": "0x0",
|
|
"tokenID": "ETH",
|
|
"tokenIDIsOwnerToken": False,
|
|
"toTokenID": "",
|
|
"disabledFromChainIDs": [10, 42161],
|
|
"disabledToChainIDs": [10, 42161],
|
|
"gasFeeMode": 1,
|
|
"fromLockedAmount": {},
|
|
}
|
|
for key, new_value in kwargs.items():
|
|
if key in input_params:
|
|
input_params[key] = new_value
|
|
else:
|
|
logging.info(f"Warning: The key '{key}' does not exist in the input_params parameters and will be ignored.")
|
|
params = [input_params]
|
|
|
|
rpc_client.prepare_wait_for_signal("wallet.suggested.routes", 1)
|
|
_ = rpc_client.rpc_valid_request(method, params)
|
|
|
|
routes = rpc_client.wait_for_signal("wallet.suggested.routes")
|
|
assert routes["event"]["Uuid"] == _uuid
|
|
|
|
return routes["event"]
|
|
|
|
|
|
def build_transactions_from_route(rpc_client, uuid, **kwargs):
|
|
method = "wallet_buildTransactionsFromRoute"
|
|
build_tx_params = {"uuid": uuid, "slippagePercentage": 0}
|
|
for key, new_value in kwargs.items():
|
|
if key in build_tx_params:
|
|
build_tx_params[key] = new_value
|
|
else:
|
|
logging.info(f"Warning: The key '{key}' does not exist in the build_tx_params parameters and will be ignored.")
|
|
params = [build_tx_params]
|
|
_ = rpc_client.rpc_valid_request(method, params)
|
|
|
|
wallet_router_sign_transactions = rpc_client.wait_for_signal("wallet.router.sign-transactions")
|
|
|
|
assert wallet_router_sign_transactions["event"]["signingDetails"]["signOnKeycard"] is False
|
|
transaction_hashes = wallet_router_sign_transactions["event"]["signingDetails"]["hashes"]
|
|
|
|
assert transaction_hashes, "Transaction hashes are empty!"
|
|
|
|
return wallet_router_sign_transactions["event"]
|
|
|
|
|
|
def sign_messages(rpc_client, hashes):
|
|
tx_signatures = {}
|
|
|
|
for hash in hashes:
|
|
|
|
method = "wallet_signMessage"
|
|
params = [hash, user_1.address, option.password]
|
|
|
|
response = rpc_client.rpc_valid_request(method, params)
|
|
|
|
result = response.json().get("result")
|
|
assert result and result.startswith("0x"), f"Invalid transaction signature for hash {hash}: {result}"
|
|
|
|
tx_signature = result[2:]
|
|
|
|
signature = {
|
|
"r": tx_signature[:64],
|
|
"s": tx_signature[64:128],
|
|
"v": tx_signature[128:],
|
|
}
|
|
|
|
tx_signatures[hash] = signature
|
|
return tx_signatures
|
|
|
|
|
|
def send_router_transactions_with_signatures(rpc_client, uuid, tx_signatures):
|
|
method = "wallet_sendRouterTransactionsWithSignatures"
|
|
params = [{"uuid": uuid, "Signatures": tx_signatures}]
|
|
_ = rpc_client.rpc_valid_request(method, params)
|
|
|
|
tx_status = rpc_client.wait_for_signal("wallet.transaction.status-changed")
|
|
|
|
assert tx_status["event"]["status"] == "Success"
|
|
|
|
return tx_status["event"]
|
|
|
|
|
|
def send_router_transaction(rpc_client, **kwargs):
|
|
routes = get_suggested_routes(rpc_client, **kwargs)
|
|
build_tx = build_transactions_from_route(rpc_client, routes["Uuid"])
|
|
tx_signatures = sign_messages(rpc_client, build_tx["signingDetails"]["hashes"])
|
|
tx_status = send_router_transactions_with_signatures(rpc_client, routes["Uuid"], tx_signatures)
|
|
return {
|
|
"routes": routes,
|
|
"build_tx": build_tx,
|
|
"tx_signatures": tx_signatures,
|
|
"tx_status": tx_status,
|
|
}
|