mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-01-02 22:13:11 +00:00
* Adding more corner cases * Adding more tests * Adding more tests * Final tests added * Adding mesh test * Fix failing test on CI
1140 lines
48 KiB
Python
1140 lines
48 KiB
Python
import pytest
|
|
from src.env_vars import NODE_1, NODE_2
|
|
from src.libs.common import delay, to_base64
|
|
from src.libs.custom_logger import get_custom_logger
|
|
from src.node.store_response import StoreResponse
|
|
from src.node.waku_node import WakuNode
|
|
from src.steps.store import StepsStore
|
|
import time
|
|
|
|
logger = get_custom_logger(__name__)
|
|
|
|
"""
|
|
In those tests we aim to combine multiple protocols/node types and create a more end-to-end scenario
|
|
"""
|
|
|
|
|
|
class TestStoreSync(StepsStore):
|
|
@pytest.fixture(scope="function", autouse=True)
|
|
def nodes(self):
|
|
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
|
|
self.node2 = WakuNode(NODE_1, f"node2_{self.test_id}")
|
|
self.node3 = WakuNode(NODE_1, f"node3_{self.test_id}")
|
|
self.num_messages = 10
|
|
|
|
@pytest.mark.skip("This test doesn not work as expected, will be fixed by @aya")
|
|
def test_sync_nodes_are_relay(self):
|
|
self.node1.start(store="true", relay="true")
|
|
self.node2.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
|
|
self.node3.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
|
|
|
delay(2) # wait for the sync to finish
|
|
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
|
|
node1_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
|
|
node2_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
|
|
node3_message = len(self.store_response.messages)
|
|
|
|
assert (
|
|
node1_message == node2_message == node3_message == self.num_messages
|
|
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
|
|
|
|
def test_sync_nodes_have_store_true(self):
|
|
self.node1.start(store="true", relay="true")
|
|
self.node2.start(store="true", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
|
|
self.node3.start(store="true", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
|
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
|
|
node1_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
|
|
node2_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
|
|
node3_message = len(self.store_response.messages)
|
|
|
|
assert (
|
|
node1_message == node2_message == node3_message == self.num_messages
|
|
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
|
|
|
|
def test_sync_nodes_are_not_relay_and_have_storenode_set(self):
|
|
self.node1.start(store="true", relay="true")
|
|
self.node2.start(
|
|
store="false",
|
|
store_sync="true",
|
|
relay="false",
|
|
storenode=self.node1.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.node3.start(
|
|
store="false",
|
|
store_sync="true",
|
|
relay="false",
|
|
storenode=self.node1.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node2.get_enr_uri(),
|
|
)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
|
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
|
|
node1_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
|
|
node2_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
|
|
node3_message = len(self.store_response.messages)
|
|
|
|
assert (
|
|
node1_message == node2_message == node3_message == self.num_messages
|
|
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
|
|
|
|
def test_sync_messages_received_via_lightpush(self):
|
|
self.node1.start(store="true", store_sync="true", relay="true", lightpush="true")
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
relay="true",
|
|
lightpush="true",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
lightpushnode=self.node1.get_multiaddr_with_id(),
|
|
)
|
|
self.node3.start(
|
|
store="true",
|
|
store_sync="true",
|
|
relay="true",
|
|
storenode=self.node2.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node2.get_enr_uri(),
|
|
)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
message_list = [self.publish_message(sender=self.node1, via="lightpush") for _ in range(self.num_messages)]
|
|
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
|
|
node1_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
|
|
node2_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
|
|
node3_message = len(self.store_response.messages)
|
|
|
|
assert (
|
|
node1_message == node2_message == node3_message == self.num_messages
|
|
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
|
|
|
|
def test_check_sync_when_2_nodes_publish(self):
|
|
self.node1.start(store="true", store_sync="true", relay="true")
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
relay="true",
|
|
storenode=self.node1.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.node3.start(
|
|
store="false",
|
|
store_sync="true",
|
|
relay="false",
|
|
storenode=self.node2.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node2.get_enr_uri(),
|
|
)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
|
|
ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
|
|
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2)
|
|
node1_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2)
|
|
node2_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2)
|
|
node3_message = len(self.store_response.messages)
|
|
|
|
assert (
|
|
node1_message == node2_message == node3_message == self.num_messages * 2
|
|
), f"Store messages are not equal to each other or not equal to {self.num_messages * 2}"
|
|
|
|
def test_check_sync_when_all_3_nodes_publish(self):
|
|
self.node1.start(store="true", store_sync="true", relay="true")
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
relay="true",
|
|
storenode=self.node1.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.node3.start(
|
|
store="false",
|
|
store_sync="true",
|
|
relay="true",
|
|
storenode=self.node2.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node2.get_enr_uri(),
|
|
)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
|
|
ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
|
|
ml3 = [self.publish_message(sender=self.node3, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
|
|
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3)
|
|
node1_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3)
|
|
node2_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3)
|
|
node3_message = len(self.store_response.messages)
|
|
|
|
assert (
|
|
node1_message == node2_message == node3_message == self.num_messages * 3
|
|
), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}"
|
|
|
|
#########################################################
|
|
|
|
def test_sync_with_one_node_with_delayed_start(self):
|
|
self.node1.start(store="true", store_sync="true", relay="true")
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
relay="true",
|
|
storenode=self.node1.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
|
|
|
# start the 3rd node
|
|
self.node3.start(
|
|
store="false",
|
|
store_sync="true",
|
|
relay="true",
|
|
storenode=self.node2.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node2.get_enr_uri(),
|
|
)
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
delay(1) # wait for the sync to finish
|
|
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
|
|
node1_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
|
|
node2_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
|
|
node3_message = len(self.store_response.messages)
|
|
|
|
assert (
|
|
node1_message == node2_message == node3_message == self.num_messages
|
|
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
|
|
|
|
def test_sync_with_nodes_restart__case1(self):
|
|
self.node1.start(store="true", store_sync="true", relay="true")
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
relay="true",
|
|
storenode=self.node1.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.node3.start(
|
|
store="false",
|
|
store_sync="true",
|
|
relay="true",
|
|
storenode=self.node2.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node2.get_enr_uri(),
|
|
)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
|
|
ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
|
|
ml3 = [self.publish_message(sender=self.node3, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
|
|
|
|
self.node1.restart()
|
|
self.node2.restart()
|
|
self.node3.restart()
|
|
|
|
delay(2) # wait for the sync to finish
|
|
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3)
|
|
node1_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3)
|
|
node2_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3)
|
|
node3_message = len(self.store_response.messages)
|
|
|
|
assert (
|
|
node1_message == node2_message == node3_message == self.num_messages * 3
|
|
), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}"
|
|
|
|
def test_sync_with_nodes_restart__case2(self):
|
|
self.node1.start(store="true", relay="true")
|
|
self.node2.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
|
|
self.node3.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
|
|
ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
|
|
ml3 = [self.publish_message(sender=self.node3, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
|
|
|
|
self.node2.restart()
|
|
|
|
delay(5) # wait for the sync to finish
|
|
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3)
|
|
node1_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3)
|
|
node2_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3)
|
|
node3_message = len(self.store_response.messages)
|
|
|
|
assert (
|
|
node1_message == node2_message == node3_message == self.num_messages * 3
|
|
), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}"
|
|
|
|
def test_high_message_volume_sync(self):
|
|
self.node1.start(store="true", store_sync="true", relay="true")
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
relay="true",
|
|
storenode=self.node1.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.node3.start(
|
|
store="false",
|
|
store_sync="true",
|
|
relay="true",
|
|
storenode=self.node2.get_multiaddr_with_id(),
|
|
discv5_bootstrap_node=self.node2.get_enr_uri(),
|
|
)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
expected_message_hash_list = {"nwaku": []}
|
|
|
|
for _ in range(500): # total 1500 messages
|
|
messages = [self.create_message() for _ in range(3)]
|
|
|
|
for i, node in enumerate([self.node1, self.node2, self.node3]):
|
|
self.publish_message(sender=node, via="relay", message=messages[i], message_propagation_delay=0.01)
|
|
|
|
expected_message_hash_list["nwaku"].extend([self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex") for msg in messages])
|
|
|
|
delay(5) # wait for the sync to finish
|
|
|
|
for node in [self.node1, self.node2, self.node3]:
|
|
store_response = StoreResponse({"paginationCursor": "", "pagination_cursor": ""}, node)
|
|
response_message_hash_list = []
|
|
while store_response.pagination_cursor is not None:
|
|
cursor = store_response.pagination_cursor
|
|
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
|
|
for index in range(len(store_response.messages)):
|
|
response_message_hash_list.append(store_response.message_hash(index))
|
|
assert len(expected_message_hash_list[node.type()]) == len(response_message_hash_list), "Message count mismatch"
|
|
assert expected_message_hash_list[node.type()] == response_message_hash_list, "Message hash mismatch"
|
|
|
|
def test_large_message_payload_sync(self):
|
|
self.node1.start(store="true", relay="true")
|
|
self.node2.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
|
|
self.node3.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
payload_length = 1024 * 100 # after encoding to base64 this will be close to 150KB
|
|
|
|
ml1 = [
|
|
self.publish_message(
|
|
sender=self.node1, via="relay", message=self.create_message(payload=to_base64("a" * (payload_length))), message_propagation_delay=0.01
|
|
)
|
|
for _ in range(self.num_messages)
|
|
]
|
|
ml2 = [
|
|
self.publish_message(
|
|
sender=self.node2, via="relay", message=self.create_message(payload=to_base64("a" * (payload_length))), message_propagation_delay=0.01
|
|
)
|
|
for _ in range(self.num_messages)
|
|
]
|
|
ml3 = [
|
|
self.publish_message(
|
|
sender=self.node3, via="relay", message=self.create_message(payload=to_base64("a" * (payload_length))), message_propagation_delay=0.01
|
|
)
|
|
for _ in range(self.num_messages)
|
|
]
|
|
|
|
delay(10) # wait for the sync to finish
|
|
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3)
|
|
node1_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3)
|
|
node2_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3)
|
|
node3_message = len(self.store_response.messages)
|
|
|
|
assert (
|
|
node1_message == node2_message == node3_message == self.num_messages * 3
|
|
), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}"
|
|
|
|
def test_sync_flags(self):
|
|
self.node1.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=1,
|
|
store_sync_range=10,
|
|
store_sync_relay_jitter=1,
|
|
relay="true",
|
|
)
|
|
self.node2.start(
|
|
store="false",
|
|
store_sync="true",
|
|
store_sync_interval=1,
|
|
store_sync_range=10,
|
|
store_sync_relay_jitter=1,
|
|
relay="true",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.node3.start(
|
|
store="false",
|
|
store_sync="true",
|
|
store_sync_interval=1,
|
|
store_sync_range=10,
|
|
store_sync_relay_jitter=1,
|
|
relay="true",
|
|
discv5_bootstrap_node=self.node2.get_enr_uri(),
|
|
)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
|
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
|
|
|
delay(2) # wait for the sync to finish
|
|
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
|
|
node1_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
|
|
node2_message = len(self.store_response.messages)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
|
|
node3_message = len(self.store_response.messages)
|
|
|
|
assert (
|
|
node1_message == node2_message == node3_message == self.num_messages
|
|
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
|
|
|
|
def test_sync_flags_no_relay_2nodes(self):
|
|
self.node1.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=10,
|
|
store_sync_range=45,
|
|
store_sync_relay_jitter=0,
|
|
relay="true",
|
|
)
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=10,
|
|
store_sync_range=45,
|
|
store_sync_relay_jitter=0,
|
|
relay="false",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
delay(1)
|
|
|
|
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
|
|
|
delay(20) # wait for the sync to finish
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
|
|
|
|
def test_sync_flags_node2_start_later(self):
|
|
self.node1.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=10,
|
|
store_sync_range=45,
|
|
store_sync_relay_jitter=0,
|
|
relay="true",
|
|
)
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
|
delay(1)
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=10,
|
|
store_sync_range=45,
|
|
store_sync_relay_jitter=0,
|
|
relay="false",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
|
|
delay(65)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
|
|
|
|
def test_store_sync_indirect_node(self):
|
|
self.node1.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=10,
|
|
store_sync_range=45,
|
|
store_sync_relay_jitter=0,
|
|
relay="true",
|
|
)
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=10,
|
|
store_sync_range=45,
|
|
store_sync_relay_jitter=0,
|
|
relay="false",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
|
|
self.node3.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=10,
|
|
store_sync_range=45,
|
|
store_sync_relay_jitter=0,
|
|
relay="false",
|
|
dns_discovery="false",
|
|
discv5_bootstrap_node=self.node2.get_enr_uri(),
|
|
)
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
|
delay(65)
|
|
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
|
|
|
|
def test_store_sync_long_chain(self):
|
|
sync_interval = 10
|
|
sync_range = 120
|
|
self.node1.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=0,
|
|
relay="true",
|
|
dns_discovery="false",
|
|
)
|
|
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
|
|
self.node5 = WakuNode(NODE_2, f"node5_{self.test_id}")
|
|
self.node6 = WakuNode(NODE_2, f"node6_{self.test_id}")
|
|
self.node7 = WakuNode(NODE_2, f"node7_{self.test_id}")
|
|
self.node8 = WakuNode(NODE_2, f"node8_{self.test_id}")
|
|
|
|
extra_nodes = [
|
|
self.node2,
|
|
self.node3,
|
|
self.node4,
|
|
self.node5,
|
|
self.node6,
|
|
self.node7,
|
|
self.node8,
|
|
]
|
|
prev = self.node1
|
|
for node in extra_nodes:
|
|
node.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=0,
|
|
relay="false",
|
|
dns_discovery="false",
|
|
)
|
|
|
|
self.add_node_peer(node, [prev.get_multiaddr_with_id()])
|
|
prev = node
|
|
|
|
published = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
|
|
delay(sync_interval * 7 + 20)
|
|
self.check_published_message_is_stored(
|
|
page_size=100,
|
|
ascending="true",
|
|
store_node=self.node8,
|
|
messages_to_check=published,
|
|
)
|
|
|
|
def test_store_sync_overlap_sync_window(self):
|
|
sync_interval = 15
|
|
sync_range = 45
|
|
intervals = 6
|
|
publish_secs = sync_interval * intervals
|
|
|
|
self.node1.start(relay="true", store="true", store_sync="true", dns_discovery="false")
|
|
|
|
self.node2.start(
|
|
relay="false",
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=0,
|
|
dns_discovery="false",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
|
|
logger.debug(f"Publishing {publish_secs} messages at 1 msg/s")
|
|
published_hashes = []
|
|
for i in range(publish_secs):
|
|
msg = self.publish_message(sender=self.node1, via="relay")
|
|
published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex"))
|
|
delay(0.8)
|
|
|
|
logger.debug(f"Waiting {sync_interval * 2} seconds to allow at least two sync rounds")
|
|
delay(sync_interval * 2)
|
|
|
|
logger.debug("Querying node2 store for all messages")
|
|
resp = self.get_messages_from_store(self.node2, page_size=2000, ascending="true")
|
|
store_hashes = [resp.message_hash(i) for i in range(len(resp.messages))]
|
|
|
|
logger.debug(f"Store returned {len(store_hashes)} messages, published publish_secs" f" messages")
|
|
|
|
assert len(set(store_hashes)) == publish_secs
|
|
assert set(store_hashes) == set(published_hashes)
|
|
|
|
@pytest.mark.timeout(60 * 20)
|
|
def test_query_after_long_time(self):
|
|
sync_range = 150
|
|
backlog_secs = 10 * 60
|
|
publish_delay = 0.8
|
|
sync_interval = 10
|
|
|
|
self.node1.start(store="true", relay="true", store_sync="true", dns_discovery="false")
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
logger.debug(f"Publishing {backlog_secs} messages at 1 msg/s to build backlog")
|
|
published_hashes = []
|
|
for _ in range(backlog_secs):
|
|
msg = self.publish_message(sender=self.node1, via="relay")
|
|
published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex"))
|
|
delay(publish_delay)
|
|
|
|
expected_hashes = published_hashes[-sync_range:]
|
|
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=0,
|
|
relay="false",
|
|
dns_discovery="false",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
|
|
logger.debug(f"Waiting {sync_interval * 2} s to let Node B finish its first sync")
|
|
delay(sync_interval * 4)
|
|
|
|
store_response = StoreResponse({"paginationCursor": "", "pagination_cursor": ""}, self.node2)
|
|
store_hashes = []
|
|
while store_response.pagination_cursor is not None:
|
|
cursor = store_response.pagination_cursor
|
|
store_response = self.get_messages_from_store(self.node2, page_size=100, cursor=cursor, ascending="true")
|
|
for idx in range(len(store_response.messages)):
|
|
store_hashes.append(store_response.message_hash(idx))
|
|
|
|
logger.debug(f"Store returned {len(store_hashes)} messages; expected range {len(expected_hashes) - 20} : {len(expected_hashes)}")
|
|
assert len(expected_hashes) >= len(store_hashes) > len(expected_hashes) - 20, "Incorrect number of messages synced"
|
|
|
|
@pytest.mark.timeout(60 * 3)
|
|
def test_store_sync_after_partition_under_100_msgs(self):
|
|
sync_interval = 10
|
|
sync_range = 180
|
|
node2up_secs = 20
|
|
node2down_secs = 60
|
|
publish_delay = 0.8
|
|
total_expected = node2up_secs + node2down_secs
|
|
|
|
self.node1.start(store="true", relay="true", store_sync="true", dns_discovery="false")
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=0,
|
|
relay="false",
|
|
dns_discovery="false",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
|
|
published_hashes = []
|
|
for _ in range(node2up_secs):
|
|
m = self.publish_message(sender=self.node1, via="relay")
|
|
published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, m, hash_type="hex"))
|
|
delay(publish_delay)
|
|
|
|
logger.debug("Pausing Node2 container ")
|
|
self.node2.pause()
|
|
|
|
logger.debug("Publishing while node2 paused ")
|
|
for _ in range(node2down_secs):
|
|
m = self.publish_message(sender=self.node1, via="relay")
|
|
published_hashes.append(self.compute_message_hash(self.test_pubsub_topic, m, hash_type="hex"))
|
|
delay(publish_delay)
|
|
|
|
logger.debug("Unpausing Node2")
|
|
self.node2.unpause()
|
|
delay(sync_interval * 2)
|
|
|
|
resp = self.get_messages_from_store(self.node2, ascending="true", page_size=100)
|
|
store_hashes = [resp.message_hash(i) for i in range(len(resp.messages))]
|
|
|
|
logger.debug(f"Node2 store has {len(store_hashes)} messages; expected {total_expected}")
|
|
assert len(store_hashes) == total_expected, "Message count mismatch after partition"
|
|
assert set(store_hashes) == set(published_hashes), "Missing or extra messages after sync"
|
|
|
|
def test_store_sync_small_sync_range(self):
|
|
sync_interval = 10
|
|
sync_range = 20
|
|
jitter = 0
|
|
backlog_wait = 60
|
|
publish_count = 3
|
|
|
|
self.node1.start(
|
|
store="true",
|
|
store_sync="true",
|
|
relay="true",
|
|
dns_discovery="false",
|
|
)
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
for _ in range(publish_count):
|
|
self.publish_message(sender=self.node1, via="relay")
|
|
|
|
time.sleep(backlog_wait)
|
|
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=jitter,
|
|
relay="false", # ensure no gossip path
|
|
dns_discovery="false",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
|
|
time.sleep(sync_interval * 2)
|
|
|
|
resp = self.get_messages_from_store(
|
|
self.node2,
|
|
page_size=100,
|
|
cursor="",
|
|
ascending="true",
|
|
)
|
|
logger.debug("Node-2 local store returned %d messages; expected 0", len(resp.messages))
|
|
assert len(resp.messages) == 0, "Store-Sync unexpectedly fetched messages older than the configured window"
|
|
|
|
def test_store_sync_range_with_jitter_catches_old_messages(self):
|
|
sync_interval = 5
|
|
sync_range = 20
|
|
jitter = 25
|
|
backlog_wait = 25
|
|
publish_count = 3
|
|
|
|
self.node1.start(store="true", store_sync="true", relay="true", dns_discovery="false")
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
for _ in range(publish_count):
|
|
self.publish_message(sender=self.node1, via="relay")
|
|
|
|
time.sleep(backlog_wait)
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=jitter,
|
|
relay="false",
|
|
dns_discovery="false",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
|
|
time.sleep(sync_interval * 2)
|
|
|
|
resp = self.get_messages_from_store(self.node2, page_size=100, cursor="", ascending="true")
|
|
|
|
assert len(resp.messages) == publish_count
|
|
|
|
def test_store_sync_range_with_zero_jitter(self):
|
|
sync_interval = 5
|
|
sync_range = 20
|
|
jitter = 0
|
|
backlog_wait = 25
|
|
publish_count = 3
|
|
|
|
self.node1.start(store="true", store_sync="true", relay="true", dns_discovery="false")
|
|
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
for _ in range(publish_count):
|
|
self.publish_message(sender=self.node1, via="relay")
|
|
|
|
time.sleep(backlog_wait)
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=jitter,
|
|
relay="false",
|
|
dns_discovery="false",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
|
|
time.sleep(sync_interval * 2)
|
|
|
|
resp = self.get_messages_from_store(self.node2, page_size=100, cursor="", ascending="true")
|
|
|
|
assert len(resp.messages) == 0
|
|
|
|
def test_three_store_sync_exchange(self):
|
|
msgs_per_node = 20
|
|
total_expected = msgs_per_node * 3
|
|
sync_interval = 6
|
|
sync_range = 600
|
|
jitter = 0
|
|
publish_delay = 0.01
|
|
wait_cycles = 3
|
|
|
|
self.node1.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=jitter,
|
|
relay="true",
|
|
dns_discovery="false",
|
|
)
|
|
|
|
for _ in range(msgs_per_node):
|
|
self.publish_message(sender=self.node1, via="relay")
|
|
time.sleep(publish_delay)
|
|
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=jitter,
|
|
relay="true",
|
|
dns_discovery="false",
|
|
)
|
|
|
|
for _ in range(msgs_per_node):
|
|
self.publish_message(sender=self.node2, via="relay")
|
|
time.sleep(publish_delay)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
self.node3.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=jitter,
|
|
relay="true",
|
|
dns_discovery="false",
|
|
)
|
|
for _ in range(msgs_per_node):
|
|
self.publish_message(sender=self.node3, via="relay")
|
|
time.sleep(publish_delay)
|
|
|
|
self.add_node_peer(
|
|
self.node3,
|
|
[self.node1.get_multiaddr_with_id(), self.node2.get_multiaddr_with_id()],
|
|
)
|
|
|
|
time.sleep(sync_interval * wait_cycles)
|
|
resp_A = self.get_messages_from_store(self.node1, page_size=200, cursor="", ascending="true", peer_id="")
|
|
logger.debug("Node-A store has %d messages", len(resp_A.messages))
|
|
assert len(resp_A.messages) == total_expected, f" For Node A expected {total_expected}, got {len(resp_A.messages)}"
|
|
|
|
resp_B = self.get_messages_from_store(self.node2, page_size=200, cursor="", ascending="true", peer_id="")
|
|
logger.debug("Node-B store has %d messages", len(resp_B.messages))
|
|
assert len(resp_B.messages) == total_expected, f"expected {total_expected}, got {len(resp_B.messages)}"
|
|
resp_C = self.get_messages_from_store(self.node3, page_size=200, cursor="", ascending="true", peer_id="")
|
|
logger.debug("Node-C store has %d messages", len(resp_C.messages))
|
|
assert len(resp_C.messages) == total_expected, f"expected {total_expected}, got {len(resp_C.messages)}"
|
|
|
|
@pytest.mark.timeout(240)
|
|
def test_node_without_sync_or_relay_stays_empty(self):
|
|
msgs_to_publish = 30
|
|
sync_interval = 6
|
|
sync_range = 300
|
|
jitter = 0
|
|
publish_delay = 0.01
|
|
wait_cycles = 3
|
|
topic = self.test_pubsub_topic
|
|
|
|
self.node1.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=jitter,
|
|
relay="true",
|
|
dns_discovery="false",
|
|
)
|
|
for _ in range(msgs_to_publish):
|
|
self.publish_message(sender=self.node1, via="relay")
|
|
time.sleep(publish_delay)
|
|
|
|
self.node2.start(
|
|
# store="false",
|
|
store_sync="false",
|
|
relay="false",
|
|
dns_discovery="false",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
# self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
|
|
self.node3.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=jitter,
|
|
relay="false",
|
|
dns_discovery="false",
|
|
discv5_bootstrap_node=self.node1.get_enr_uri(),
|
|
)
|
|
self.add_node_peer(self.node3, [self.node1.get_multiaddr_with_id()])
|
|
|
|
time.sleep(sync_interval * wait_cycles)
|
|
resp3 = self.get_messages_from_store(self.node3, page_size=200, cursor="", ascending="true", peer_id="")
|
|
logger.debug("Node3 store has %d messages expected %d", len(resp3.messages), msgs_to_publish)
|
|
assert len(resp3.messages) == msgs_to_publish, f"Node3 store mismatch: expected {msgs_to_publish}, " f"got {len(resp3.messages)}"
|
|
|
|
def test_continuous_store_sync(self):
|
|
msgs_per_round = 30
|
|
rounds = 3
|
|
sleep_between_rounds = 30
|
|
publish_delay = 0.01
|
|
sync_interval = 6
|
|
sync_range = 600
|
|
jitter = 0
|
|
|
|
self.node1.start(
|
|
store="true",
|
|
store_sync="true",
|
|
relay="true",
|
|
dns_discovery="false",
|
|
)
|
|
|
|
self.node2.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=jitter,
|
|
relay="false",
|
|
dns_discovery="false",
|
|
)
|
|
|
|
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
|
|
|
total_published = 0
|
|
for _ in range(rounds):
|
|
for _ in range(msgs_per_round):
|
|
self.publish_message(sender=self.node1, via="relay")
|
|
total_published += 1
|
|
time.sleep(publish_delay)
|
|
|
|
time.sleep(sync_interval * 2)
|
|
|
|
resp = self.get_messages_from_store(
|
|
self.node2,
|
|
page_size=100,
|
|
cursor="",
|
|
ascending="true",
|
|
peer_id="",
|
|
)
|
|
logger.debug(f"Node-2 store has {len(resp.messages)}/{total_published} messages")
|
|
assert len(resp.messages) == total_published, f"expected {total_published}, got {len(resp.messages)}"
|
|
|
|
time.sleep(sleep_between_rounds)
|
|
|
|
def test_store_sync_high_jitter_stress(self):
|
|
sync_interval = 10
|
|
sync_range = 120
|
|
jitter = 90
|
|
msgs_per_node = 50
|
|
message_delay = 0.0
|
|
page_size = 100
|
|
|
|
nodes = [self.node1, self.node2]
|
|
|
|
for n in nodes:
|
|
n.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=jitter,
|
|
relay="true",
|
|
dns_discovery="false",
|
|
)
|
|
|
|
self.add_node_peer(self.node1, [self.node2.get_multiaddr_with_id()])
|
|
expected_hashes = []
|
|
for _ in range(msgs_per_node):
|
|
msgs = [self.create_message() for _ in nodes]
|
|
for node, msg in zip(nodes, msgs):
|
|
self.publish_message(
|
|
sender=node,
|
|
via="relay",
|
|
message=msg,
|
|
message_propagation_delay=message_delay,
|
|
)
|
|
expected_hashes.append(self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex"))
|
|
|
|
delay(120)
|
|
|
|
for node in nodes:
|
|
store_resp = self.get_messages_from_store(node, page_size=page_size, ascending="true")
|
|
retrieved_hashes = [store_resp.message_hash(i) for i in range(len(store_resp.messages))]
|
|
assert len(retrieved_hashes) == len(expected_hashes), " message count mismatch"
|
|
assert retrieved_hashes == expected_hashes, "{ message hash mismatch"
|
|
|
|
def test_store_sync_five_node_mesh_burst(self):
|
|
sync_interval = 3
|
|
sync_range = 900
|
|
msgs_per_node = 100
|
|
message_delay = 0.0
|
|
page_size = 100
|
|
|
|
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
|
|
self.node5 = WakuNode(NODE_2, f"node5_{self.test_id}")
|
|
nodes = [self.node1, self.node2, self.node3, self.node4, self.node5]
|
|
|
|
for n in nodes:
|
|
n.start(
|
|
store="true",
|
|
store_sync="true",
|
|
store_sync_interval=sync_interval,
|
|
store_sync_range=sync_range,
|
|
store_sync_relay_jitter=0,
|
|
relay="true",
|
|
dns_discovery="false",
|
|
)
|
|
n.set_relay_subscriptions([self.test_pubsub_topic])
|
|
|
|
for i, a in enumerate(nodes):
|
|
for b in nodes[i + 1 :]:
|
|
self.add_node_peer(a, [b.get_multiaddr_with_id()])
|
|
self.add_node_peer(b, [a.get_multiaddr_with_id()])
|
|
|
|
expected_hashes = []
|
|
for _ in range(msgs_per_node):
|
|
msgs = [self.create_message() for _ in nodes]
|
|
for node, msg in zip(nodes, msgs):
|
|
self.publish_message(
|
|
sender=node,
|
|
via="relay",
|
|
message=msg,
|
|
message_propagation_delay=message_delay,
|
|
)
|
|
expected_hashes.append(self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex"))
|
|
|
|
delay(sync_interval * 4 + 20)
|
|
|
|
for node in nodes:
|
|
store_resp = StoreResponse({"paginationCursor": "", "pagination_cursor": ""}, node)
|
|
retrieved_hashes = []
|
|
while store_resp.pagination_cursor is not None:
|
|
cursor = store_resp.pagination_cursor
|
|
store_resp = self.get_messages_from_store(
|
|
node,
|
|
page_size=page_size,
|
|
cursor=cursor,
|
|
ascending="true",
|
|
)
|
|
for i in range(len(store_resp.messages)):
|
|
retrieved_hashes.append(store_resp.message_hash(i))
|
|
assert len(retrieved_hashes) == len(expected_hashes), f"{node.name}: message count mismatch"
|
|
assert retrieved_hashes == expected_hashes, f"{node.name}: message hash mismatch"
|