diff --git a/scripts/store_sync.sh b/scripts/store_sync.sh new file mode 100755 index 0000000000..a646ba11de --- /dev/null +++ b/scripts/store_sync.sh @@ -0,0 +1,105 @@ +#!/bin/bash +printf "\nAssuming you already have a docker network called waku\n" +# if not something like this should create it: docker network create --driver bridge --subnet 172.18.0.0/16 --gateway 172.18.0.1 waku + + +cluster_id=3 +pubsub_topic="/waku/2/rs/$cluster_id/0" +encoded_pubsub_topic=$(echo "$pubsub_topic" | sed 's:/:%2F:g') +content_topic="/test/1/store/proto" +encoded_content_topic=$(echo "$content_topic" | sed 's:/:%2F:g') +node_1=wakuorg/nwaku:latest +node_1_ip=172.18.64.13 +node_1_rest=32261 +node_1_tcp=32262 +node_2=wakuorg/nwaku:latest +node_2_ip=172.18.64.14 +node_2_rest=7588 +node_3=wakuorg/nwaku:latest +node_3_ip=172.18.64.15 +node_3_rest=8588 + + +printf "\nStarting containers\n" + +container_id1=$(docker run -d -i -t -p $node_1_rest:$node_1_rest -p $node_1_tcp:$node_1_tcp -p 32263:32263 -p 32264:32264 -p 32265:32265 $node_1 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=32263 --rest-port=$node_1_rest --tcp-port=$node_1_tcp --discv5-udp-port=32264 --rest-address=0.0.0.0 --nat=extip:$node_1_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --nodekey=4fa0fd1b13b89ba45dd77037b6cac09bb5ccdc0df8fab44efeb066da2a378744 --shard=0 --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=19552 --metrics-logging=true --store=true --store-sync=true --relay=true --lightpush=true) +docker network connect --ip $node_1_ip waku $container_id1 + +printf "\nSleeping 2 seconds\n" +sleep 2 + +response=$(curl -X GET "http://127.0.0.1:$node_1_rest/debug/v1/info" -H "accept: application/json") +enrUri=$(echo $response | jq -r '.enrUri') + +# Extract the first non-WebSocket address +ws_address=$(echo $response | jq -r '.listenAddresses[] | select(contains("/ws") | not)') + +# Check if we got an address, and construct the new address with it +if [[ $ws_address != "" ]]; then + identifier=$(echo $ws_address | awk -F'/p2p/' '{print $2}') + if [[ $identifier != "" ]]; then + multiaddr_with_id="/ip4/${node_1_ip}/tcp/${node_1_tcp}/p2p/${identifier}" + echo $multiaddr_with_id + else + echo "No identifier found in the address." + exit 1 + fi +else + echo "No non-WebSocket address found." + exit 1 +fi + +container_id2=$(docker run -d -i -t -p $node_2_rest:$node_2_rest -p 4589:4589 -p 4590:4590 -p 4591:4591 -p 4592:4592 $node_2 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=4590 --rest-port=$node_2_rest --tcp-port=4589 --discv5-udp-port=4591 --rest-address=0.0.0.0 --nat=extip:$node_2_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --nodekey=0efae97aa2a5303aa77dc27d11d47bbbe0ddb8b55eeceaeb61de2ec5efe7e2a5 --shard=0 --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=4592 --metrics-logging=true --store=true --store-sync=true --relay=false --lightpush=true --storenode=$multiaddr_with_id --discv5-bootstrap-node=$enrUri --lightpushnode=$multiaddr_with_id) + +docker network connect --ip $node_2_ip waku $container_id2 + +printf "\nSleeping 1 seconds\n" +sleep 1 + +printf "\nConnect peers\n" +curl -X POST "http://127.0.0.1:$node_2_rest/admin/v1/peers" -H "Content-Type: application/json" -d "[\"$multiaddr_with_id\"]" + +container_id3=$(docker run -d -i -t -p $node_3_rest:$node_3_rest -p 5589:5589 -p 5590:5590 -p 5591:5591 -p 5592:5592 $node_3 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=5590 --rest-port=$node_3_rest --tcp-port=5589 --discv5-udp-port=5591 --rest-address=0.0.0.0 --nat=extip:$node_3_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --nodekey=0efae97aa2a5303aa77dc27d11d47bbbe0ddb8b55eeceaeb61de2ec5efe7e2a8 --shard=0 --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=5592 --metrics-logging=true --store=true --store-sync=true --relay=true --storenode=$multiaddr_with_id --discv5-bootstrap-node=$enrUri) +docker network connect --ip $node_3_ip waku $container_id3 + +printf "\nSleeping 1 seconds\n" +sleep 1 + +printf "\nConnect peers\n" +curl -X POST "http://127.0.0.1:$node_3_rest/admin/v1/peers" -H "Content-Type: application/json" -d "[\"$multiaddr_with_id\"]" + +printf "\nSubscribe\n" +curl -X POST "http://127.0.0.1:$node_1_rest/relay/v1/subscriptions" -H "Content-Type: application/json" -d "[\"$pubsub_topic\"]" +curl -X POST "http://127.0.0.1:$node_3_rest/relay/v1/subscriptions" -H "Content-Type: application/json" -d "[\"$pubsub_topic\"]" + + +printf "\nSleeping 1 seconds\n" +sleep 1 + +printf "\nRelay from NODE 3\n" +curl -X POST "http://127.0.0.1:$node_3_rest/relay/v1/messages/$encoded_pubsub_topic" \ +-H "Content-Type: application/json" \ +-d '{"payload": "UmVsYXkgd29ya3MhIQ==", "contentTopic": "'"$content_topic"'", "timestamp": '$(date +%s%N)'}' + +printf "\nLightpush from NODE 2\n" +curl -v -X POST "http://127.0.0.1:$node_2_rest/lightpush/v1/message" -H "Content-Type: application/json" -d '{"pubsubTopic": "'"$pubsub_topic"'", "message": {"payload": "TGlnaHQgcHVzaCB3b3JrcyEh", "contentTopic": "'"$content_topic"'", "timestamp": '$(date +%s%N)'}}' + + +printf "\nSleeping 1 seconds\n" +sleep 1 + + +printf "\nCheck message was stored in NODE 1 (relay) with v1 API\n" +response=$(curl -X GET "http://127.0.0.1:$node_1_rest/store/v1/messages?contentTopics=$encoded_content_topic&pageSize=5&ascending=true") +printf "\nResponse: $response\n" + +printf "\nCheck message was stored in NODE 3 (relay) with v1 API\n" +response=$(curl -X GET "http://127.0.0.1:$node_3_rest/store/v1/messages?contentTopics=$encoded_content_topic&pageSize=5&ascending=true") +printf "\nResponse: $response\n" + +printf "\nSleeping 10 seconds\n" +sleep 10 + +printf "\nCheck message was stored in NODE 2 (lightpush) with v3 API\n" +response=$(curl -X GET "http://127.0.0.1:$node_2_rest/store/v3/messages?contentTopics=$encoded_content_topic&pageSize=5&ascending=true") +printf "\nResponse: $response\n" \ No newline at end of file diff --git a/src/steps/store.py b/src/steps/store.py index ca18788f0a..dbf503a153 100644 --- a/src/steps/store.py +++ b/src/steps/store.py @@ -200,21 +200,19 @@ class StepsStore(StepsCommon): page_size=None, ascending=None, store_v="v3", - message_to_check=None, + messages_to_check=None, **kwargs, ): if pubsub_topic is None: pubsub_topic = self.test_pubsub_topic - if message_to_check is None: - message_to_check = self.message + if messages_to_check is None: + messages_to_check = [self.message] if store_node is None: store_node = self.store_nodes elif not isinstance(store_node, list): store_node = [store_node] - else: - store_node = store_node for node in store_node: - logger.debug(f"Checking that peer {node.image} can find the stored message") + logger.debug(f"Checking that peer {node.image} can find the stored messages") self.store_response = self.get_messages_from_store( node=node, peer_addr=peer_addr, @@ -232,16 +230,27 @@ class StepsStore(StepsCommon): ) assert self.store_response.messages, f"Peer {node.image} couldn't find any messages. Actual response: {self.store_response.resp_json}" - assert len(self.store_response.messages) >= 1, "Expected at least 1 message but got none" - store_message_index = -1 # we are looking for the last and most recent message in the store - waku_message = WakuMessage([self.store_response.messages[store_message_index:]]) - if store_v == "v1": - waku_message.assert_received_message(message_to_check) + assert len(self.store_response.messages) >= len( + messages_to_check + ), f"Expected at least {len(messages_to_check)} messages but got {len(self.store_response.messages)}" + + # Determine the range of indices for message comparison + if len(messages_to_check) == 1: + indices = [-1] # Use the last message in the store response for a single message else: - expected_hash = self.compute_message_hash(pubsub_topic, message_to_check) - assert expected_hash == self.store_response.message_hash( - store_message_index - ), f"Message hash returned by store doesn't match the computed message hash {expected_hash}" + indices = range(len(messages_to_check)) # Use corresponding indices for multiple messages + + # Iterate through messages_to_check and their respective indices + for idx, message_to_check in zip(indices, messages_to_check): + if store_v == "v1": + waku_message = WakuMessage([self.store_response.messages[idx]]) + waku_message.assert_received_message(message_to_check) + else: + expected_hash = self.compute_message_hash(pubsub_topic, message_to_check) + actual_hash = self.store_response.message_hash(idx) + assert ( + expected_hash == actual_hash + ), f"Message hash at index {idx} returned by store doesn't match the computed message hash {expected_hash}. Actual hash: {actual_hash}" @allure.step def check_store_returns_empty_response(self, pubsub_topic=None): diff --git a/src/test_data.py b/src/test_data.py index 745ee7c98f..bc581c1850 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -178,7 +178,6 @@ LOG_ERROR_KEYWORDS = [ "segfault", "corrupt", "terminated", - "oom", "unhandled", "stacktrace", "deadlock", diff --git a/tests/e2e/test_e2e.py b/tests/e2e/test_e2e.py index 0c5eee3ad0..6a9bcc9492 100644 --- a/tests/e2e/test_e2e.py +++ b/tests/e2e/test_e2e.py @@ -147,7 +147,7 @@ class TestE2E(StepsFilter, StepsStore, StepsRelay, StepsLightPush): message = self.create_message() self.node1.send_light_push_message(self.create_payload(message=message)) delay(1) - self.check_published_message_is_stored(page_size=50, ascending="true", store_node=self.node3, message_to_check=message) + self.check_published_message_is_stored(page_size=50, ascending="true", store_node=self.node3, messages_to_check=[message]) def test_chain_of_relay_nodes(self): self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") diff --git a/tests/store/test_ephemeral.py b/tests/store/test_ephemeral.py index acd1824aa1..0acccb79b6 100644 --- a/tests/store/test_ephemeral.py +++ b/tests/store/test_ephemeral.py @@ -18,9 +18,9 @@ class TestEphemeral(StepsStore): def test_message_with_both_ephemeral_true_and_false(self): self.publish_message(message=self.create_message(ephemeral=True)) stored = self.publish_message(message=self.create_message(ephemeral=False)) - self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored) + self.check_published_message_is_stored(page_size=5, ascending="true", messages_to_check=[stored]) assert len(self.store_response.messages) == 1 stored = self.publish_message(message=self.create_message(ephemeral=False)) self.publish_message(message=self.create_message(ephemeral=True)) - self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored) + self.check_published_message_is_stored(page_size=5, ascending="true", messages_to_check=[stored]) assert len(self.store_response.messages) == 2 diff --git a/tests/store_sync/__init__.py b/tests/store_sync/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/store_sync/test_store_sync.py b/tests/store_sync/test_store_sync.py new file mode 100644 index 0000000000..a0d7bffb78 --- /dev/null +++ b/tests/store_sync/test_store_sync.py @@ -0,0 +1,488 @@ +import pytest +from src.env_vars import NODE_1, NODE_2 +from src.libs.common import delay, to_base64 +from src.libs.custom_logger import get_custom_logger +from src.node.store_response import StoreResponse +from src.node.waku_node import WakuNode +from src.steps.store import StepsStore + +logger = get_custom_logger(__name__) + +""" +In those tests we aim to combine multiple protocols/node types and create a more end-to-end scenario +""" + + +@pytest.mark.skipif("go-waku" in (NODE_1 + NODE_2), reason="Test works only with nwaku") +class TestStoreSync(StepsStore): + @pytest.fixture(scope="function", autouse=True) + def nodes(self): + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_1, f"node2_{self.test_id}") + self.node3 = WakuNode(NODE_1, f"node3_{self.test_id}") + self.num_messages = 10 + + def test_sync_nodes_are_relay(self): + self.node1.start(store="true", relay="true") + self.node2.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + self.node3.set_relay_subscriptions([self.test_pubsub_topic]) + + message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)] + + delay(2) # wait for the sync to finish + + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list) + node1_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list) + node2_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list) + node3_message = len(self.store_response.messages) + + assert ( + node1_message == node2_message == node3_message == self.num_messages + ), f"Store messages are not equal to each other or not equal to {self.num_messages}" + + def test_sync_nodes_have_store_true(self): + self.node1.start(store="true", relay="true") + self.node2.start(store="true", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(store="true", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + self.node3.set_relay_subscriptions([self.test_pubsub_topic]) + + message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)] + + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list) + node1_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list) + node2_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list) + node3_message = len(self.store_response.messages) + + assert ( + node1_message == node2_message == node3_message == self.num_messages + ), f"Store messages are not equal to each other or not equal to {self.num_messages}" + + def test_sync_nodes_are_not_relay_and_have_storenode_set(self): + self.node1.start(store="true", relay="true") + self.node2.start( + store="false", + store_sync="true", + relay="false", + storenode=self.node1.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + self.node3.start( + store="false", + store_sync="true", + relay="false", + storenode=self.node1.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node2.get_enr_uri(), + ) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + + message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)] + + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list) + node1_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list) + node2_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list) + node3_message = len(self.store_response.messages) + + assert ( + node1_message == node2_message == node3_message == self.num_messages + ), f"Store messages are not equal to each other or not equal to {self.num_messages}" + + def test_sync_messages_received_via_lightpush(self): + self.node1.start(store="true", store_sync="true", relay="true", lightpush="true") + self.node2.start( + store="true", + store_sync="true", + relay="true", + lightpush="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + lightpushnode=self.node1.get_multiaddr_with_id(), + ) + self.node3.start( + store="true", + store_sync="true", + relay="true", + storenode=self.node2.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node2.get_enr_uri(), + ) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + self.node3.set_relay_subscriptions([self.test_pubsub_topic]) + + message_list = [self.publish_message(sender=self.node1, via="lightpush") for _ in range(self.num_messages)] + + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list) + node1_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list) + node2_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list) + node3_message = len(self.store_response.messages) + + assert ( + node1_message == node2_message == node3_message == self.num_messages + ), f"Store messages are not equal to each other or not equal to {self.num_messages}" + + def test_check_sync_when_2_nodes_publish(self): + self.node1.start(store="true", store_sync="true", relay="true") + self.node2.start( + store="true", + store_sync="true", + relay="true", + storenode=self.node1.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + self.node3.start( + store="false", + store_sync="true", + relay="false", + storenode=self.node2.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node2.get_enr_uri(), + ) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)] + ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)] + + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2) + node1_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2) + node2_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2) + node3_message = len(self.store_response.messages) + + assert ( + node1_message == node2_message == node3_message == self.num_messages * 2 + ), f"Store messages are not equal to each other or not equal to {self.num_messages * 2}" + + def test_check_sync_when_all_3_nodes_publish(self): + self.node1.start(store="true", store_sync="true", relay="true") + self.node2.start( + store="true", + store_sync="true", + relay="true", + storenode=self.node1.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + self.node3.start( + store="false", + store_sync="true", + relay="true", + storenode=self.node2.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node2.get_enr_uri(), + ) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + self.node3.set_relay_subscriptions([self.test_pubsub_topic]) + + ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)] + ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)] + ml3 = [self.publish_message(sender=self.node3, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)] + + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3) + node1_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3) + node2_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3) + node3_message = len(self.store_response.messages) + + assert ( + node1_message == node2_message == node3_message == self.num_messages * 3 + ), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}" + + ######################################################### + + def test_sync_with_one_node_with_delayed_start(self): + self.node1.start(store="true", store_sync="true", relay="true") + self.node2.start( + store="true", + store_sync="true", + relay="true", + storenode=self.node1.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)] + + # start the 3rd node + self.node3.start( + store="false", + store_sync="true", + relay="true", + storenode=self.node2.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node2.get_enr_uri(), + ) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + self.node3.set_relay_subscriptions([self.test_pubsub_topic]) + + delay(1) # wait for the sync to finish + + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list) + node1_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list) + node2_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list) + node3_message = len(self.store_response.messages) + + assert ( + node1_message == node2_message == node3_message == self.num_messages + ), f"Store messages are not equal to each other or not equal to {self.num_messages}" + + def test_sync_with_nodes_restart__case1(self): + self.node1.start(store="true", store_sync="true", relay="true") + self.node2.start( + store="true", + store_sync="true", + relay="true", + storenode=self.node1.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + self.node3.start( + store="false", + store_sync="true", + relay="true", + storenode=self.node2.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node2.get_enr_uri(), + ) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + self.node3.set_relay_subscriptions([self.test_pubsub_topic]) + + ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)] + ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)] + ml3 = [self.publish_message(sender=self.node3, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)] + + self.node1.restart() + self.node2.restart() + self.node3.restart() + + delay(2) # wait for the sync to finish + + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3) + node1_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3) + node2_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3) + node3_message = len(self.store_response.messages) + + assert ( + node1_message == node2_message == node3_message == self.num_messages * 3 + ), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}" + + def test_sync_with_nodes_restart__case2(self): + self.node1.start(store="true", relay="true") + self.node2.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + self.node3.set_relay_subscriptions([self.test_pubsub_topic]) + + ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)] + ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)] + ml3 = [self.publish_message(sender=self.node3, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)] + + self.node2.restart() + + delay(5) # wait for the sync to finish + + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3) + node1_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3) + node2_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3) + node3_message = len(self.store_response.messages) + + assert ( + node1_message == node2_message == node3_message == self.num_messages * 3 + ), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}" + + def test_high_message_volume_sync(self): + self.node1.start(store="true", store_sync="true", relay="true") + self.node2.start( + store="true", + store_sync="true", + relay="true", + storenode=self.node1.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + self.node3.start( + store="false", + store_sync="true", + relay="true", + storenode=self.node2.get_multiaddr_with_id(), + discv5_bootstrap_node=self.node2.get_enr_uri(), + ) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + self.node3.set_relay_subscriptions([self.test_pubsub_topic]) + + expected_message_hash_list = [] + + for _ in range(500): # total 1500 messages + messages = [self.create_message() for _ in range(3)] + + for i, node in enumerate([self.node1, self.node2, self.node3]): + self.publish_message(sender=node, via="relay", message=messages[i], message_propagation_delay=0.01) + + expected_message_hash_list.extend([self.compute_message_hash(self.test_pubsub_topic, msg) for msg in messages]) + + delay(5) # wait for the sync to finish + + for node in [self.node1, self.node2, self.node3]: + store_response = StoreResponse({"paginationCursor": "", "pagination_cursor": ""}, node) + response_message_hash_list = [] + while store_response.pagination_cursor is not None: + cursor = store_response.pagination_cursor + store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor) + for index in range(len(store_response.messages)): + response_message_hash_list.append(store_response.message_hash(index)) + assert len(expected_message_hash_list) == len(response_message_hash_list), "Message count mismatch" + assert expected_message_hash_list == response_message_hash_list, "Message hash mismatch" + + def test_large_message_payload_sync(self): + self.node1.start(store="true", relay="true") + self.node2.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + self.node3.set_relay_subscriptions([self.test_pubsub_topic]) + + payload_length = 1024 * 100 # after encoding to base64 this will be close to 150KB + + ml1 = [ + self.publish_message( + sender=self.node1, via="relay", message=self.create_message(payload=to_base64("a" * (payload_length))), message_propagation_delay=0.01 + ) + for _ in range(self.num_messages) + ] + ml2 = [ + self.publish_message( + sender=self.node2, via="relay", message=self.create_message(payload=to_base64("a" * (payload_length))), message_propagation_delay=0.01 + ) + for _ in range(self.num_messages) + ] + ml3 = [ + self.publish_message( + sender=self.node3, via="relay", message=self.create_message(payload=to_base64("a" * (payload_length))), message_propagation_delay=0.01 + ) + for _ in range(self.num_messages) + ] + + delay(10) # wait for the sync to finish + + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3) + node1_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3) + node2_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3) + node3_message = len(self.store_response.messages) + + assert ( + node1_message == node2_message == node3_message == self.num_messages * 3 + ), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}" + + def test_sync_flags(self): + self.node1.start( + store="true", + store_sync="true", + store_sync_interval=1, + store_sync_range=10, + store_sync_relay_jitter=1, + store_sync_max_payload_size=1000, + relay="true", + ) + self.node2.start( + store="false", + store_sync="true", + store_sync_interval=1, + store_sync_range=10, + store_sync_relay_jitter=1, + store_sync_max_payload_size=1000, + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + self.node3.start( + store="false", + store_sync="true", + store_sync_interval=1, + store_sync_range=10, + store_sync_relay_jitter=1, + store_sync_max_payload_size=1000, + relay="true", + discv5_bootstrap_node=self.node2.get_enr_uri(), + ) + + self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()]) + self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()]) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + self.node3.set_relay_subscriptions([self.test_pubsub_topic]) + + message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)] + + delay(2) # wait for the sync to finish + + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list) + node1_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list) + node2_message = len(self.store_response.messages) + self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list) + node3_message = len(self.store_response.messages) + + assert ( + node1_message == node2_message == node3_message == self.num_messages + ), f"Store messages are not equal to each other or not equal to {self.num_messages}"