e2e: LambdaTest trial

This commit is contained in:
Yevheniia Berdnyk 2024-10-24 18:59:53 +03:00
parent c0ae299ef9
commit eac2b5ea44
No known key found for this signature in database
18 changed files with 161 additions and 135 deletions

View File

@ -17,6 +17,7 @@ eth-account==0.7.0
eth-hash==0.3.2
eth-keys
execnet==1.7.1
filelock==3.16.1
flaky==3.7.0
future==0.18.2
hexbytes==0.2.2

View File

@ -5,6 +5,7 @@ import re
import urllib
from hashlib import md5
from support.lambda_test import get_session_info
from support.test_data import SingleTestData
@ -96,17 +97,12 @@ class BaseTestReport:
failed.append(test)
return passed, failed, xfailed
def get_sauce_token(self, job_id):
return hmac.new(bytes(self.sauce_username + ":" + self.sauce_access_key, 'latin-1'),
bytes(job_id, 'latin-1'), md5).hexdigest()
# def get_sauce_token(self, job_id):
# return hmac.new(bytes(self.sauce_username + ":" + self.sauce_access_key, 'latin-1'),
# bytes(job_id, 'latin-1'), md5).hexdigest()
def get_sauce_job_url(self, job_id, first_command=0):
token = self.get_sauce_token(job_id)
from tests.conftest import apibase
url = 'https://%s/jobs/%s?auth=%s' % (apibase, job_id, token)
if first_command > 0:
url += "#%s" % first_command
return url
def get_lambda_test_job_url(self, job_id, first_command=0):
return "https://appautomation.lambdatest.com/test?testID=" + get_session_info(job_id)['test_id']
@staticmethod
def get_jenkins_link_to_rerun_e2e(branch_name="develop", pr_id="", tr_case_ids=""):

View File

@ -166,7 +166,7 @@ class GithubHtmlReport(BaseTestReport):
first_command = 0
else:
first_command = 0
html += "<li><a href=\"%s\">Steps, video, logs</a></li>" % self.get_sauce_job_url(job_id, first_command)
html += "<li><a href=\"%s\">Steps, video, logs</a></li>" % self.get_lambda_test_job_url(job_id, first_command)
# if test_run.error:
# html += "<li><a href=\"%s\">Failure screenshot</a></li>" % self.get_sauce_final_screenshot_url(job_id)
html += "</ul></p>"

View File

@ -22,7 +22,18 @@ def update_session(session_id, session_name, status):
resp = session.get(
url="https://mobile-api.lambdatest.com/mobile-automation/api/v1/sessions/%s" % session_id,
data={
"name": session_name #, "status_ind": status
"name": session_name # , "status_ind": status
}
)
assert resp.status_code == 200
def get_session_info(session_id):
resp = session.get(url="https://mobile-api.lambdatest.com/mobile-automation/api/v1/sessions/%s" % session_id)
assert resp.status_code == 200
return resp.json()['data']
def upload_image():
'curl -u "yevheniia:fZaXHEAFEWOVCZLnSVrwHY11eJGsWAknibtG572PiZsvT1h57V" -X POST https://mobile-mgm.lambdatest.com/mfs/v1.0/media/upload -F media_file=@/Users/yberdnyk/Downloads/aaa.png -F type=image -F custom_id=SampleImage'
pass

View File

@ -166,8 +166,8 @@ class TestrailReport(BaseTestReport):
else:
first_command = 0
try:
devices += "# [Device %d](%s) \n" % (i + 1, self.get_sauce_job_url(job_id=device,
first_command=first_command))
devices += "# [Device %d](%s) \n" % (i + 1, self.get_lambda_test_job_url(job_id=device,
first_command=first_command))
except KeyError:
devices += "# Device %s: SauceLabs session was not found \n" % (i + 1)
comment = str()
@ -266,7 +266,7 @@ class TestrailReport(BaseTestReport):
first_command = 0
else:
first_command = 0
job_url = self.get_sauce_job_url(job_id=job_id, first_command=first_command)
job_url = self.get_lambda_test_job_url(job_id=job_id, first_command=first_command)
case_info = "Logs for device %d: [steps](%s), [failure screenshot](%s)" \
% (f, job_url, self.get_sauce_final_screenshot_url(job_id))

View File

@ -12,15 +12,13 @@ import requests
from appium import webdriver
from appium.options.common import AppiumOptions
from appium.webdriver.common.mobileby import MobileBy
from sauceclient import SauceException
from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import NoSuchElementException, WebDriverException
from urllib3.exceptions import MaxRetryError, ProtocolError
from support.api.network_api import NetworkApi
from tests import test_suite_data, start_threads, appium_container, pytest_config_global, transl
from tests.conftest import sauce_username, sauce_access_key, apibase, github_report, run_name, option, \
lambda_test_username, lambda_test_access_key
from tests.conftest import sauce_username, sauce_access_key, apibase, github_report, run_name, lambda_test_username, \
lambda_test_access_key
executor_sauce_lab = 'https://%s:%s@ondemand.%s:443/wd/hub' % (sauce_username, sauce_access_key, apibase)
executor_lambda_test = 'https://%s:%s@mobile-hub.lambdatest.com/wd/hub' % (lambda_test_username, lambda_test_access_key)
@ -126,16 +124,21 @@ def get_lambda_test_capabilities_emulator():
"w3c": True,
"platformName": "android",
"deviceName": "Pixel 6",
"appiumVersion": "2.11.0",
"appiumVersion": "latest",
"platformVersion": "14",
"app": "lt://APP10160522181729681886587724", #option.lambda_test_apk_url,
"app": pytest_config_global['lt_apk_url'],
"devicelog": True,
"visual": True,
# "network": True,
"video": True,
"build": run_name,
"name": test_suite_data.current_test.group_name,
"idleTimeout": 1000
"idleTimeout": 1000,
# "enableImageInjection": True,
"uploadMedia": ["lt://MEDIA2b3e34e2b0ee4928b9fc38c603f98191",], # "lt://MEDIAcfc1b4f1af0740759254404186bbe4f1"]
},
"appium:options": {
"automationName": "UiAutomator2",
"hideKeyboard": True
}
}
options = AppiumOptions()
@ -165,9 +168,9 @@ def pull_requests_log(driver):
class AbstractTestCase:
__metaclass__ = ABCMeta
def print_sauce_lab_info(self, driver):
def print_lt_session_info(self, driver):
sys.stdout = sys.stderr
print("SauceOnDemandSessionID=%s job-name=%s" % (driver.session_id, run_name))
print("LambdaTestSessionID=%s job-name=%s" % (driver.session_id, run_name))
def get_translation_by_key(self, key):
return transl[key]
@ -220,6 +223,16 @@ class Driver(webdriver.Remote):
def fail(self, text: str):
pytest.fail('Device %s: %s' % (self.number, text))
def update_lt_session_status(self, index, status):
data = {
"action": "setTestStatus",
"arguments": {
"status": status,
"remark": "Device %s" % index
}
}
self.execute_script("lambda-hook: %s" % str(data).replace("'", "\""))
class Errors(object):
def __init__(self):
@ -254,7 +267,7 @@ class SingleDeviceTestCase(AbstractTestCase):
def teardown_method(self, method):
if self.environment == 'sauce':
self.print_sauce_lab_info(self.driver)
self.print_lt_session_info(self.driver)
try:
self.add_alert_text_to_report(self.driver)
geth_content = pull_geth(self.driver)
@ -316,7 +329,7 @@ class SauceMultipleDeviceTestCase(AbstractTestCase):
geth_names, geth_contents = [], []
for driver in self.drivers:
try:
self.print_sauce_lab_info(self.drivers[driver])
self.print_lt_session_info(self.drivers[driver])
self.add_alert_text_to_report(self.drivers[driver])
geth_names.append(
'%s_geth%s.log' % (test_suite_data.current_test.name, str(self.drivers[driver].number)))
@ -354,18 +367,19 @@ def create_shared_drivers(quantity):
drivers,
command_executor=executor_lambda_test,
options=get_lambda_test_capabilities_emulator()))
# options=get_lambda_test_capabilities_real_device()))
if len(drivers) < quantity:
test_suite_data.current_test.testruns[-1].error = "Not all %s drivers are created" % quantity
for i in range(quantity):
test_suite_data.current_test.testruns[-1].jobs[drivers[i].session_id] = i + 1
drivers[i].implicitly_wait(implicit_wait)
drivers[i].update_settings({"enforceXPath1": True})
return drivers, loop
except (MaxRetryError, AttributeError) as e:
test_suite_data.current_test.testruns[-1].error = str(e)
for _, driver in drivers.items():
for i, driver in drivers.items():
try:
driver.update_lt_session_status(i + 1, "failed")
driver.quit()
except (WebDriverException, AttributeError):
pass
@ -410,8 +424,9 @@ class SauceSharedMultipleDeviceTestCase(AbstractTestCase):
def setup_method(self, method):
if not self.drivers:
pytest.fail(test_suite_data.current_test.testruns[-1].error)
# for _, driver in self.drivers.items():
# driver.execute_script("sauce:context=Started %s" % method.__name__)
for _, driver in self.drivers.items():
driver.execute_script("lambda-testCase-start=%s" % method.__name__)
driver.log_event("appium", "Started %s" % method.__name__)
jobs = test_suite_data.current_test.testruns[-1].jobs
if not jobs:
for index, driver in self.drivers.items():
@ -423,7 +438,7 @@ class SauceSharedMultipleDeviceTestCase(AbstractTestCase):
log_names, log_contents = [], []
for driver in self.drivers:
try:
self.print_sauce_lab_info(self.drivers[driver])
self.print_lt_session_info(self.drivers[driver])
self.add_alert_text_to_report(self.drivers[driver])
log_names.append(
'%s_geth%s.log' % (test_suite_data.current_test.name, str(self.drivers[driver].number)))
@ -451,7 +466,6 @@ class SauceSharedMultipleDeviceTestCase(AbstractTestCase):
@classmethod
def teardown_class(cls):
from tests.conftest import sauce
requests_session = requests.Session()
requests_session.auth = (lambda_test_username, lambda_test_access_key)
if test_suite_data.tests[0].testruns[-1].error and 'setup failed' in test_suite_data.tests[0].testruns[
@ -467,20 +481,13 @@ class SauceSharedMultipleDeviceTestCase(AbstractTestCase):
log_names.append('%s_geth%s.log' % (cls.__name__, i))
log_contents.append(pull_requests_log(driver=driver))
log_names.append('%s_requests%s.log' % (cls.__name__, i))
session_id = driver.session_id
from support.lambda_test import update_session
try:
# sauce.jobs.update_job(username=sauce_username, job_id=session_id, name=cls.__name__)
update_session(
session_id=session_id,
session_name=cls.__name__,
status="failed" if group_setup_failed else "passed"
)
except (RemoteDisconnected, SauceException, requests.exceptions.ConnectionError):
pass
lt_session_status = "failed"
else:
lt_session_status = "passed"
try:
driver.update_lt_session_status(i + 1, lt_session_status)
driver.quit()
except WebDriverException:
except (WebDriverException, RemoteDisconnected):
pass
# url = 'https://api.%s/rest/v1/%s/jobs/%s/assets/%s' % (apibase, sauce_username, session_id, "log.json")
# try:

View File

@ -1,3 +1,4 @@
import json
import os
import re
import signal
@ -11,6 +12,7 @@ from os import environ
import pytest
import requests
from _pytest.runner import runtestprotocol
from filelock import FileLock
from requests.exceptions import ConnectionError as c_er
import tests
@ -145,7 +147,6 @@ def pytest_addoption(parser):
@dataclass
class Option:
datacenter: str = None
lambda_test_apk_url: str = None
option = Option()
@ -156,9 +157,6 @@ sauce = None
run_name = None
# lambda_test_apk_url = None
def is_master(config):
return not hasattr(config, 'workerinput')
@ -252,7 +250,6 @@ def pytest_configure(config):
testrail_report = TestrailReport()
from support.github_report import GithubHtmlReport
global github_report
from saucelab_api_client.saucelab_api_client import SauceLab
github_report = GithubHtmlReport()
tests.pytest_config_global = vars(config.option)
config.addinivalue_line("markers", "testrail_id(name): empty")
@ -264,8 +261,6 @@ def pytest_configure(config):
else:
raise NotImplementedError("Unknown SauceLabs datacenter")
# global sauce
# sauce = SauceLab('https://api.' + apibase + '/', sauce_username, sauce_access_key)
if config.getoption('log_steps'):
import logging
logging.basicConfig(level=logging.INFO)
@ -281,18 +276,6 @@ def pytest_configure(config):
else:
run_name = get_run_name(config, new_one=False)
if is_master(config):
apk_src = config.getoption('apk')
if apk_src.startswith('http'):
apk_path = _download_apk(apk_src)
else:
apk_path = apk_src
# global lambda_test_apk_url
option.lambda_test_apk_url = _upload_and_check_response(apk_path)
if apk_src.startswith('http'):
os.remove(apk_path)
if not is_master(config):
return
@ -308,8 +291,23 @@ def pytest_configure(config):
)
def pytest_configure_node(node):
node.workerinput['lambda_test_apk_url'] = node.config.option.lambda_test_apk_url
@pytest.fixture(scope='session', autouse=True)
def upload_apk(tmp_path_factory):
fn = tmp_path_factory.getbasetemp().parent / "lt_apk.json"
with FileLock(str(fn) + ".lock"):
if fn.is_file():
data = json.loads(fn.read_text())
tests.pytest_config_global['lt_apk_url'] = data['lambda_test_apk_url']
else:
apk_src = tests.pytest_config_global['apk']
if apk_src.startswith('http'):
apk_path = _download_apk(apk_src)
else:
apk_path = apk_src
tests.pytest_config_global['lt_apk_url'] = _upload_and_check_response(apk_path)
fn.write_text(json.dumps({'lambda_test_apk_url': tests.pytest_config_global['lt_apk_url']}))
if apk_src.startswith('http'):
os.remove(apk_path)
def pytest_unconfigure(config):

View File

@ -3,7 +3,6 @@ import random
import emoji
import pytest
from _pytest.outcomes import Failed
from appium.webdriver.connectiontype import ConnectionType
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from tests import marks, run_in_parallel, transl
@ -88,14 +87,13 @@ class TestOneToOneChatMultipleSharedDevicesNewUi(MultipleSharedDeviceTestCase):
message_sender.emojis_below_message(emoji="love").wait_for_element_text(2)
self.device_1.just_fyi("Check emojis info")
message_sender.emojis_below_message(emoji="love").long_press_until_element_is_shown(
self.chat_1.authors_for_reaction(emoji="love"))
message_sender.emojis_below_message(emoji="love").long_press_without_release()
if not self.chat_1.user_list_element_by_name(
self.username_1).is_element_displayed() or not self.chat_1.user_list_element_by_name(
self.username_2).is_element_displayed():
self.errors.append("Incorrect users are shown for 'love' reaction.")
self.chat_1.authors_for_reaction(emoji="thumbs-up").click()
self.chat_1.authors_for_reaction(emoji="thumbs-up").double_click()
if not self.chat_1.user_list_element_by_name(
self.username_1).is_element_displayed() or self.chat_1.user_list_element_by_name(
self.username_2).is_element_displayed():
@ -233,8 +231,7 @@ class TestOneToOneChatMultipleSharedDevicesNewUi(MultipleSharedDeviceTestCase):
self.errors.append("Can pin more than 3 messages in chat")
else:
unpin_element = self.chat_1.element_by_translation_id('unpin-from-chat')
self.chat_1.pinned_messages_list.message_element_by_text(self.message_2).long_press_element(
element_to_release_on=unpin_element)
self.chat_1.pinned_messages_list.message_element_by_text(self.message_2).long_press_without_release()
self.home_1.just_fyi("Unpin one message so that another could be pinned")
unpin_element.click_until_absense_of_element(desired_element=unpin_element)
self.chat_1.pin_message(self.message_4, 'pin-to-chat')
@ -255,7 +252,7 @@ class TestOneToOneChatMultipleSharedDevicesNewUi(MultipleSharedDeviceTestCase):
pinned_message = self.chat_1.pinned_messages_list.message_element_by_text(self.message_4)
unpin_element = self.chat_1.element_by_translation_id("unpin-from-chat")
pinned_message.long_press_element(element_to_release_on=unpin_element)
pinned_message.long_press_without_release()
unpin_element.click_until_absense_of_element(unpin_element)
# try:
# self.chat_2.chat_element_by_text(self.message_4).pinned_by_label.wait_for_invisibility_of_element()
@ -416,7 +413,7 @@ class TestOneToOneChatMultipleSharedDevicesNewUi(MultipleSharedDeviceTestCase):
self.chat_1.just_fyi("Device 1 sends an image")
image_description = "test image"
self.chat_1.send_images_with_description(description=image_description, indexes=[2])
self.chat_1.send_images_with_description(description=image_description, indexes=[0])
self.chat_2.just_fyi("Device 2 checks image message")
if not self.chat_2.chat_element_by_text(image_description).is_element_displayed(30):
@ -486,7 +483,7 @@ class TestOneToOneChatMultipleSharedDevicesNewUi(MultipleSharedDeviceTestCase):
self.chat_2.send_message(message_after_edit_1_1)
self.chat_2.chat_element_by_text(message_after_edit_1_1).wait_for_status_to_be("Delivered")
chat_1_element = self.chat_1.chat_element_by_text(message_after_edit_1_1)
chat_1_element.long_press_element()
chat_1_element.long_press_without_release()
for action in ("edit", "delete-for-everyone"):
if self.chat_1.element_by_translation_id(action).is_element_displayed():
self.errors.append('Option to %s someone else message available!' % action)
@ -537,6 +534,7 @@ class TestOneToOneChatMultipleSharedDevicesNewUi(MultipleSharedDeviceTestCase):
@pytest.mark.xdist_group(name="new_six_2")
@marks.nightly
@marks.lt
class TestOneToOneChatMultipleSharedDevicesNewUiTwo(MultipleSharedDeviceTestCase):
def prepare_devices(self):
@ -590,7 +588,7 @@ class TestOneToOneChatMultipleSharedDevicesNewUiTwo(MultipleSharedDeviceTestCase
def test_1_1_chat_is_shown_message_sent_delivered_from_offline(self):
self.home_1.just_fyi('Turn on airplane mode and check that offline status is shown on home view')
for home in self.homes:
home.driver.set_network_connection(ConnectionType.AIRPLANE_MODE)
home.driver.execute_script("updateNetworkProfile=offline")
# Not implemented yet
# self.home_1.connection_offline_icon.wait_and_click(20)
@ -613,7 +611,7 @@ class TestOneToOneChatMultipleSharedDevicesNewUiTwo(MultipleSharedDeviceTestCase
self.home_2.just_fyi('Device2 goes back online and checks that status of the message is changed to "delivered"')
for home in self.homes:
home.driver.set_network_connection(ConnectionType.ALL_NETWORK_ON)
home.driver.execute_script("updateNetworkProfile=default")
self.home_1.just_fyi('Device1 goes back online and checks that 1-1 chat will be fetched')
if not self.chat_1.chat_element_by_text(message_1).is_element_displayed(120):
@ -646,13 +644,13 @@ class TestOneToOneChatMultipleSharedDevicesNewUiTwo(MultipleSharedDeviceTestCase
self.chat_1.just_fyi("Unmute chat")
self.chat_1.navigate_back_to_home_view()
chat.long_press_element()
chat.long_press_without_release()
if self.home_1.mute_chat_button.text != transl["unmute-chat"]:
self.errors.append("Chat is not muted")
expected_text = "Muted until you turn it back on"
if not self.home_1.element_by_text(expected_text).is_element_displayed():
self.errors.append("Text '%s' is not shown for muted chat" % expected_text)
self.home_1.mute_chat_button.click()
self.home_1.mute_chat_button.double_click()
unmuted_message = "after unmute"
self.chat_2.send_message(unmuted_message)

View File

@ -2,7 +2,6 @@ import datetime
import pytest
from _pytest.outcomes import Failed
from appium.webdriver.connectiontype import ConnectionType
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from tests import marks, run_in_parallel, transl
@ -128,14 +127,13 @@ class TestGroupChatMultipleDeviceMergedNewUI(MultipleSharedDeviceTestCase):
)))
self.chats[0].just_fyi("Admin checks info about voted users")
self.chats[0].chat_element_by_text(message).emojis_below_message(
emoji="thumbs-up").long_press_until_element_is_shown(self.chats[0].authors_for_reaction(emoji="thumbs-up"))
self.chats[0].chat_element_by_text(message).emojis_below_message(emoji="thumbs-up").long_press_without_release()
if not self.chats[0].user_list_element_by_name(
self.usernames[1]).is_element_displayed() or not self.chats[0].user_list_element_by_name(
self.usernames[2]).is_element_displayed():
self.errors.append("Incorrect users are shown for 'thumbs-up' reaction.")
self.chats[0].authors_for_reaction(emoji="love").click()
self.chats[0].authors_for_reaction(emoji="love").double_click()
if not self.chats[0].user_list_element_by_name(
self.usernames[1]).is_element_displayed() or self.chats[0].user_list_element_by_name(
self.usernames[2]).is_element_displayed():
@ -204,14 +202,13 @@ class TestGroupChatMultipleDeviceMergedNewUI(MultipleSharedDeviceTestCase):
chat.navigate_back_to_home_view()
self.chats[0].just_fyi("Admin checks info about voted users after relogin")
message_element.emojis_below_message(
emoji="thumbs-up").long_press_until_element_is_shown(self.chats[0].authors_for_reaction(emoji="thumbs-up"))
message_element.emojis_below_message(emoji="thumbs-up").long_press_without_release()
if self.chats[0].user_list_element_by_name(
self.usernames[1]).is_element_displayed() or not self.chats[0].user_list_element_by_name(
self.usernames[2]).is_element_displayed():
self.errors.append("Incorrect users are shown for 'thumbs-up' reaction after relogin.")
self.chats[0].authors_for_reaction(emoji="love").click()
self.chats[0].authors_for_reaction(emoji="love").double_click()
if not self.chats[0].user_list_element_by_name(
self.usernames[1]).is_element_displayed() or self.chats[0].user_list_element_by_name(
self.usernames[2]).is_element_displayed():
@ -315,12 +312,12 @@ class TestGroupChatMultipleDeviceMergedNewUI(MultipleSharedDeviceTestCase):
self.homes[0].just_fyi("Put admin device to offline and send messages from members")
self.homes[0].navigate_back_to_home_view()
app_package = self.drivers[0].current_package
self.homes[0].driver.set_network_connection(ConnectionType.AIRPLANE_MODE)
self.homes[0].driver.execute_script("updateNetworkProfile=offline")
self.chats[1].send_message(message_1)
self.chats[2].send_message(message_2)
self.homes[0].just_fyi("Put admin device to online and check that messages and PNs will be fetched")
self.homes[0].driver.set_network_connection(ConnectionType.ALL_NETWORK_ON)
self.homes[0].driver.execute_script("updateNetworkProfile=default")
self.homes[0].connection_offline_icon.wait_for_invisibility_of_element(60)
self.homes[0].open_notification_bar()
for message in (message_1, message_2):
@ -366,10 +363,11 @@ class TestGroupChatMultipleDeviceMergedNewUI(MultipleSharedDeviceTestCase):
self.errors.append("Message 1 is not pinned in group chat!")
self.chats[0].just_fyi("Check that non admin user can not unpin messages")
self.chats[1].chat_element_by_text(self.message_1).long_press_element()
self.chats[1].chat_element_by_text(self.message_1).long_press_without_release()
if self.chats[1].element_by_translation_id("unpin-from-chat").is_element_displayed():
self.errors.append("Unpin option is available for non-admin user")
self.chats[1].tap_by_coordinates(500, 100)
self.chats[1].tap_by_coordinates(500, 100)
# not implemented yet :
@ -398,8 +396,7 @@ class TestGroupChatMultipleDeviceMergedNewUI(MultipleSharedDeviceTestCase):
self.chats[0].pin_message(self.message_4, 'pin-to-chat')
self.chats[0].view_pinned_messages_button.click_until_presence_of_element(self.chats[0].pinned_messages_list)
unpin_element = self.chats[0].element_by_translation_id('unpin-from-chat')
self.chats[0].pinned_messages_list.message_element_by_text(self.message_2).long_press_element(
element_to_release_on=unpin_element)
self.chats[0].pinned_messages_list.message_element_by_text(self.message_2).long_press_without_release()
unpin_element.click_until_absense_of_element(desired_element=unpin_element)
self.chats[0].chat_element_by_text(self.message_4).click()
self.chats[0].pin_message(self.message_4, 'pin-to-chat')
@ -449,7 +446,7 @@ class TestGroupChatMultipleDeviceMergedNewUI(MultipleSharedDeviceTestCase):
"Muted until %s %s" % (exp_time.strftime('%H:%M'), "today" if current_time.hour < 23 else "tomorrow") for
exp_time in expected_times]
chat = self.homes[1].get_chat(self.chat_name)
chat.long_press_element()
chat.long_press_without_release()
if self.homes[1].mute_chat_button.text != transl["unmute-chat"]:
pytest.fail("Chat is not muted")
current_text = self.homes[1].mute_chat_button.unmute_caption_text
@ -485,9 +482,9 @@ class TestGroupChatMultipleDeviceMergedNewUI(MultipleSharedDeviceTestCase):
self.chats[1].navigate_back_to_home_view()
self.chats[1].just_fyi("Member 1 unmutes the chat")
chat.long_press_element()
self.homes[1].mute_chat_button.click()
chat.long_press_element()
chat.long_press_without_release()
self.homes[1].mute_chat_button.double_click()
chat.long_press_without_release()
if self.homes[1].element_starts_with_text("Muted until").is_element_displayed():
self.errors.append("Chat is still muted after being unmuted")
self.errors.verify_no_errors()

View File

@ -4,7 +4,6 @@ import random
import emoji
import pytest
from _pytest.outcomes import Failed
from appium.webdriver.connectiontype import ConnectionType
from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException
from tests import marks, run_in_parallel, pytest_config_global, transl
@ -702,14 +701,14 @@ class TestCommunityMultipleDeviceMerged(MultipleSharedDeviceTestCase):
# self.home_1.chats_tab.click()
# if not self.home_1.element_by_translation_id("no-messages").is_element_displayed():
# self.errors.append("1-1 chat from blocked user is not removed and messages home is not empty!")
self.chat_1.driver.set_network_connection(ConnectionType.AIRPLANE_MODE)
self.chat_1.driver.execute_script("updateNetworkProfile=offline")
self.home_2.just_fyi('Send message to public chat while device 1 is offline')
message_blocked, message_unblocked = "Message from blocked user", "Hurray! unblocked"
self.channel_2.send_message(message_blocked)
self.chat_1.just_fyi('Check that new messages from blocked user are not delivered')
self.chat_1.driver.set_network_connection(ConnectionType.ALL_NETWORK_ON)
self.chat_1.driver.execute_script("updateNetworkProfile=default")
self.home_1.communities_tab.click()
self.home_1.get_chat(self.community_name, community=True).click()
self.home_1.get_chat(self.channel_name, community_channel=True).click()
@ -818,12 +817,12 @@ class TestCommunityMultipleDeviceMerged(MultipleSharedDeviceTestCase):
self.channel_1.driver.fail("Message '%s' was not received" % message)
self.channel_2.just_fyi("Turning on airplane mode and editing/deleting messages")
self.channel_2.driver.set_network_connection(ConnectionType.AIRPLANE_MODE)
self.channel_2.driver.execute_script("updateNetworkProfile=offline")
message_after_edit = "text after edit"
self.channel_2.edit_message_in_chat(message_to_edit, message_after_edit)
self.channel_2.delete_message_in_chat(message_to_delete)
self.channel_2.just_fyi("Turning on network connection")
self.channel_2.driver.set_network_connection(ConnectionType.ALL_NETWORK_ON)
self.channel_2.driver.execute_script("updateNetworkProfile=default")
self.channel_1.just_fyi("Receiver is checking if messages were updated and deleted")
if not self.channel_1.chat_element_by_text(message_after_edit).is_element_displayed(30):

View File

@ -7,6 +7,7 @@ from views.sign_in_view import SignInView
@pytest.mark.xdist_group(name="new_one_1")
@marks.nightly
@marks.lt
class TestDeepLinksOneDevice(MultipleSharedDeviceTestCase):
def prepare_devices(self):

View File

@ -182,7 +182,7 @@ class TestFallbackMultipleDevice(MultipleSharedDeviceTestCase):
self.sign_in_2.continue_button.click()
if not self.sign_in_2.profile_title_input.is_element_displayed():
self.errors.append("Can't recover an access with a valid passphrase")
self.sign_in_2.click_system_back_button()
self.sign_in_2.click_system_back_button(times=2)
self.sign_in_2.just_fyi("Device 2: try recovering an account which is already synced")
self.sign_in_2.passphrase_edit_box.clear()

View File

@ -15,6 +15,7 @@ from views.sign_in_view import SignInView
@marks.nightly
@marks.secured
@marks.smoke
@marks.lt
class TestWalletMultipleDevice(MultipleSharedDeviceTestCase):
def prepare_devices(self):
@ -193,6 +194,7 @@ class TestWalletMultipleDevice(MultipleSharedDeviceTestCase):
@marks.nightly
@marks.secured
@marks.smoke
@marks.lt
class TestWalletOneDevice(MultipleSharedDeviceTestCase):
def prepare_devices(self):

View File

@ -10,6 +10,9 @@ from PIL import Image, ImageChops, ImageStat
from appium.webdriver.common.mobileby import MobileBy
from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException, TimeoutException
from selenium.webdriver import ActionChains
from selenium.webdriver.common.actions import interaction
from selenium.webdriver.common.actions.action_builder import ActionBuilder
from selenium.webdriver.common.actions.pointer_input import PointerInput
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait
@ -324,9 +327,21 @@ class BaseElement(object):
action.click_and_hold(element).perform()
time.sleep(2)
if element_to_release_on:
action.release(element_to_release_on.find_element()).perform()
action.release(element_to_release_on.find_element())
action.perform()
else:
action.release(element).perform()
action.release(element)
action.perform()
# actions = ActionChains(self.driver)
# actions.w3c_actions = ActionBuilder(self, mouse=PointerInput(interaction.POINTER_TOUCH, "touch"))
# actions.w3c_actions.pointer_action.click_and_hold(element)
# actions.w3c_actions.pointer_action.release()
# actions.perform()
def long_press_without_release(self):
action = ActionChains(self.driver)
action.click_and_hold(self.find_element()).perform()
def long_press_until_element_is_shown(self, expected_element):
element = self.find_element()

View File

@ -411,6 +411,7 @@ class BaseView(object):
def just_fyi(self, some_str):
self.driver.info('# STEP: %s' % some_str, device=False)
# self.driver.execute_script("sauce:context=STEP: %s" % some_str)
self.driver.log_event("appium", "STEP: %s" % some_str)
def hide_keyboard_if_shown(self):
if self.driver.is_keyboard_shown():

View File

@ -4,6 +4,7 @@ from datetime import datetime, timedelta
from time import sleep
import dateutil.parser
from appium.webdriver.common.appiumby import AppiumBy
from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException, \
InvalidElementStateException
from selenium.webdriver import ActionChains
@ -232,11 +233,14 @@ class ChatElementByText(Text):
def replied_message_text(self):
class RepliedMessageText(Text):
def __init__(self, driver, parent_locator: str):
super().__init__(driver, prefix=parent_locator,
xpath="/preceding::android.widget.TextView[@content-desc='quoted-message']")
super().__init__(driver,
# prefix=parent_locator,
# xpath="/preceding::android.widget.TextView[@content-desc='quoted-message']"
accessibility_id='quoted-message')
try:
return RepliedMessageText(self.driver, self.message_locator).text
# return RepliedMessageText(self.driver, self.message_locator).text
return self.find_element().find_element(by=AppiumBy.ACCESSIBILITY_ID, value='quoted-message').text
except NoSuchElementException:
return ''
@ -1011,13 +1015,13 @@ class ChatView(BaseView):
def pin_message(self, message, action="pin"):
self.driver.info("Looking for message '%s' pin" % message)
element = self.element_by_translation_id(action)
self.chat_element_by_text(message).long_press_until_element_is_shown(element)
self.chat_element_by_text(message).long_press_without_release()
element.click_until_absense_of_element(element)
def edit_message_in_chat(self, message_to_edit, message_to_update):
self.driver.info("Looking for message '%s' to edit it" % message_to_edit)
self.chat_element_by_text(message_to_edit).message_body.long_press_element()
self.element_by_translation_id("edit-message").click()
self.chat_element_by_text(message_to_edit).message_body.long_press_without_release()
self.element_by_translation_id("edit-message").double_click()
self.chat_message_input.clear()
self.chat_message_input.send_keys(message_to_update)
self.send_message_button.click()
@ -1028,37 +1032,31 @@ class ChatView(BaseView):
delete_button = self.element_by_translation_id("delete-for-everyone")
else:
delete_button = self.element_by_translation_id("delete-for-me")
self.chat_element_by_text(message).message_body.long_press_element()
delete_button.click()
self.chat_element_by_text(message).message_body.long_press_without_release()
delete_button.double_click()
def copy_message_text(self, message_text):
self.driver.info("Copying '%s' message via long press" % message_text)
self.chat_element_by_text(message_text).wait_for_visibility_of_element()
self.chat_element_by_text(message_text).long_press_element()
self.element_by_translation_id("copy-text").click()
self.chat_element_by_text(message_text).long_press_without_release()
self.element_by_translation_id("copy-text").double_click()
def quote_message(self, message: str):
self.driver.info("Quoting '%s' message" % message)
element = self.chat_element_by_text(message)
element.wait_for_sent_state()
element.long_press_until_element_is_shown(self.reply_message_button)
self.reply_message_button.click()
element.long_press_without_release()
self.reply_message_button.double_click()
def set_reaction(self, message: str, emoji: str = 'thumbs-up', emoji_message=False):
self.driver.info("Setting '%s' reaction" % emoji)
# Audio message is obvious should be tapped not on audio-scroll-line
# so we tap on its below element as exception here (not the case for link/tag message!)
element = Button(self.driver, accessibility_id='reaction-%s' % emoji)
if message == 'audio':
self.audio_message_in_chat_timer.long_press_element()
if emoji_message:
self.element_by_text_part(message).long_press_without_release()
else:
if not emoji_message:
self.chat_element_by_text(message).long_press_until_element_is_shown(element)
else:
self.element_by_text_part(message).long_press_until_element_is_shown(element)
# old UI
# element = Button(self.driver, accessibility_id='pick-emoji-%s' % key)
element.click()
self.chat_element_by_text(message).long_press_without_release()
element.wait_for_element()
element.double_click()
element.wait_for_invisibility_of_element()
def add_remove_same_reaction(self, message: str, emoji: str = 'thumbs-up'):

View File

@ -545,8 +545,8 @@ class HomeView(BaseView):
def delete_chat_long_press(self, username):
self.driver.info("Deleting chat '%s' by long press" % username)
self.get_chat(username).long_press_element()
self.close_chat_button.click()
self.get_chat(username).long_press_without_release()
self.close_chat_button.double_click()
self.confirm_closing_chat_button.click()
def leave_chat_long_press(self, username):
@ -565,13 +565,15 @@ class HomeView(BaseView):
def mute_chat_long_press(self, chat_name, mute_period="mute-till-unmute", community=False, community_channel=False):
self.driver.info("Muting chat with %s" % chat_name)
self.get_chat(username=chat_name, community=community, community_channel=community_channel).long_press_element()
self.get_chat(username=chat_name, community=community,
community_channel=community_channel).long_press_without_release()
if community:
self.mute_community_button.click()
element = self.mute_community_button
elif community_channel:
self.mute_channel_button.click()
element = self.mute_channel_button
else:
self.mute_chat_button.click()
element = self.mute_chat_button
element.double_click()
self.element_by_translation_id(mute_period).click()
def get_pn(self, pn_text: str):

View File

@ -149,8 +149,8 @@ class WalletView(BaseView):
def send_asset_from_drawer(self, address: str, asset_name: str, amount: float):
asset_element = self.get_asset(asset_name)
asset_element.long_press_element()
self.send_from_drawer_button.click()
asset_element.long_press_without_release()
self.send_from_drawer_button.double_click()
self.address_text_input.send_keys(address)
self.continue_button.click()
self.set_amount(amount)