fbarbu15 e109ec1db6
test_: Test private chat messages (#6259)
* test_: private chat messages

* test_: fix

* test_: apply network conditions

* test_: fix low bandwidth command

* test_: unskip remaining tests

* test_: fix test name

* test_: run tests in parallel

* test_: fix deps

* test_: remove dependencies

* test_: unique container name

* test_: increase port range

* test_: fix container cleanup

* test_: prepare for code review

* test_: debug rpc tests

* test_: debug rpc tests

* test_: revert port change to test failures

* test_: try larger port

* test_: address code review

* test_: port checks

* test_: add missing newline

* test_: cleanup all containers

* test_: fix pylint
2025-02-03 13:24:08 +02:00

46 lines
1.7 KiB
Python

from clients.rpc import RpcClient
from clients.services.service import Service
class WakuextService(Service):
def __init__(self, client: RpcClient):
super().__init__(client, "wakuext")
def send_contact_request(self, contact_id: str, message: str):
params = [{"id": contact_id, "message": message}]
response = self.rpc_request("sendContactRequest", params)
return response.json()
def accept_contact_request(self, request_id: str):
params = [{"id": request_id}]
response = self.rpc_request("acceptContactRequest", params)
return response.json()
def get_contacts(self):
response = self.rpc_request("contacts")
return response.json()
def send_message(self, contact_id: str, message: str):
params = [{"id": contact_id, "message": message}]
response = self.rpc_request("sendOneToOneMessage", params)
return response.json()
def start_messenger(self):
response = self.rpc_request("startMessenger")
json_response = response.json()
if "error" in json_response:
assert json_response["error"]["code"] == -32000
assert json_response["error"]["message"] == "messenger already started"
return
def create_group_chat_with_members(self, pubkey_list: list, group_chat_name: str):
params = [None, group_chat_name, pubkey_list]
response = self.rpc_request("createGroupChatWithMembers", params)
return response.json()
def send_group_chat_message(self, group_id: str, message: str):
params = [{"id": group_id, "message": message}]
response = self.rpc_request("sendGroupChatMessage", params)
return response.json()