testing waku setup

This commit is contained in:
mike cullerton 2022-08-19 09:43:48 -04:00
parent 188dfc1a74
commit 13ba5327c6
2 changed files with 109 additions and 1 deletions

View File

@ -94,7 +94,13 @@ class ErrorHandlingService:
@staticmethod
def handle_waku_notification(_error: ApiError, _recipients: List) -> Any:
"""WakuHandler."""
...
class WakuMessage:
payload: str
contentTopic: str # Optional
version: int # Optional
timestamp: int # Optional
class FailingService:

View File

@ -1,6 +1,7 @@
"""Test Process Api Blueprint."""
import io
import json
import requests
import time
from typing import Any
from typing import Dict
@ -1304,3 +1305,104 @@ class TestProcessApi(BaseTest):
# fs_spec = process_model_service.get_spec('random_fact')
# assert(WorkflowSpecInfoSchema().dump(fs_spec) == json_data)
#
def test_waku_debug_info(self):
debug_info_method = "get_waku_v2_debug_v1_info"
headers = {
"Content-Type": "application/json"
}
rpc_json = {
"jsonrpc": "2.0",
"method": debug_info_method,
"params": [],
"id": "id"
}
request_url = "http://localhost:8545"
rpc_response = requests.post(request_url, headers=headers, json=rpc_json)
rpc_json_text: dict = json.loads(rpc_response.text)
assert isinstance(rpc_json_text, dict)
# assert 'jsonrpc' in rpc_json_text
# assert rpc_json_text['jsonrpc'] == '2.0'
assert 'result' in rpc_json_text
result = rpc_json_text['result']
assert isinstance(result, dict)
assert 'listenAddresses' in result
assert 'enrUri' in result
print("test_call_waku")
def test_send_message(self):
relay_message_method = "post_waku_v2_relay_v1_message"
headers = {
"Content-Type": "application/json"
}
# class WakuMessage:
# payload: str
# contentTopic: str # Optional
# # version: int # Optional
# timestamp: int # Optional
payload = "This is my message"
contentTopic = "myTestTopic"
timestamp = time.time()
waku_relay_message = {
'payload': payload,
'contentTopic': contentTopic,
'timestamp': timestamp
}
# ["", [{"contentTopic":"/waku/2/default-content/proto"}]]
params = ["/waku/2/default-waku/proto", {"message": waku_relay_message}]
rpc_json = {
"jsonrpc": "2.0",
"method": relay_message_method,
"params": params,
"id": 1
}
request_url = "http://localhost:8545"
rpc_response = requests.post(request_url, headers=headers, json=rpc_json)
assert rpc_response.status_code == 200
rpc_json_data: dict = json.loads(rpc_response.text)
assert 'error' in rpc_json_data
assert 'result' in rpc_json_data
assert rpc_json_data['error'] is None
assert rpc_json_data['result'] is True
print("test_send_message")
def test_get_waku_messages(self):
method = "get_waku_v2_store_v1_messages"
headers = {
"Content-Type": "application/json"
}
params = [{"contentTopic": "/waku/2/default-content/proto"}]
rpc_json = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": 1
}
request_url = "http://localhost:8545"
rpc_response = requests.post(request_url, headers=headers, json=rpc_json)
assert rpc_response.status_code == 200
rpc_json_data: dict = json.loads(rpc_response.text)
assert 'error' in rpc_json_data
assert rpc_json_data['error'] is None
assert 'result' in rpc_json_data
assert isinstance(rpc_json_data['result'], dict)
assert 'messages' in rpc_json_data['result']
assert 'pagingInfo' in rpc_json_data['result']
print("get_waku_messages")