store sync tests (#69)

* store sync tests

* fix

* store sync tests

* sync flag
This commit is contained in:
fbarbu15 2024-09-11 12:36:58 +03:00 committed by GitHub
parent cc8bae75c1
commit 86954c6270
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 620 additions and 19 deletions

105
scripts/store_sync.sh Executable file
View File

@ -0,0 +1,105 @@
#!/bin/bash
printf "\nAssuming you already have a docker network called waku\n"
# if not something like this should create it: docker network create --driver bridge --subnet 172.18.0.0/16 --gateway 172.18.0.1 waku
cluster_id=3
pubsub_topic="/waku/2/rs/$cluster_id/0"
encoded_pubsub_topic=$(echo "$pubsub_topic" | sed 's:/:%2F:g')
content_topic="/test/1/store/proto"
encoded_content_topic=$(echo "$content_topic" | sed 's:/:%2F:g')
node_1=wakuorg/nwaku:latest
node_1_ip=172.18.64.13
node_1_rest=32261
node_1_tcp=32262
node_2=wakuorg/nwaku:latest
node_2_ip=172.18.64.14
node_2_rest=7588
node_3=wakuorg/nwaku:latest
node_3_ip=172.18.64.15
node_3_rest=8588
printf "\nStarting containers\n"
container_id1=$(docker run -d -i -t -p $node_1_rest:$node_1_rest -p $node_1_tcp:$node_1_tcp -p 32263:32263 -p 32264:32264 -p 32265:32265 $node_1 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=32263 --rest-port=$node_1_rest --tcp-port=$node_1_tcp --discv5-udp-port=32264 --rest-address=0.0.0.0 --nat=extip:$node_1_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --nodekey=4fa0fd1b13b89ba45dd77037b6cac09bb5ccdc0df8fab44efeb066da2a378744 --shard=0 --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=19552 --metrics-logging=true --store=true --store-sync=true --relay=true --lightpush=true)
docker network connect --ip $node_1_ip waku $container_id1
printf "\nSleeping 2 seconds\n"
sleep 2
response=$(curl -X GET "http://127.0.0.1:$node_1_rest/debug/v1/info" -H "accept: application/json")
enrUri=$(echo $response | jq -r '.enrUri')
# Extract the first non-WebSocket address
ws_address=$(echo $response | jq -r '.listenAddresses[] | select(contains("/ws") | not)')
# Check if we got an address, and construct the new address with it
if [[ $ws_address != "" ]]; then
identifier=$(echo $ws_address | awk -F'/p2p/' '{print $2}')
if [[ $identifier != "" ]]; then
multiaddr_with_id="/ip4/${node_1_ip}/tcp/${node_1_tcp}/p2p/${identifier}"
echo $multiaddr_with_id
else
echo "No identifier found in the address."
exit 1
fi
else
echo "No non-WebSocket address found."
exit 1
fi
container_id2=$(docker run -d -i -t -p $node_2_rest:$node_2_rest -p 4589:4589 -p 4590:4590 -p 4591:4591 -p 4592:4592 $node_2 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=4590 --rest-port=$node_2_rest --tcp-port=4589 --discv5-udp-port=4591 --rest-address=0.0.0.0 --nat=extip:$node_2_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --nodekey=0efae97aa2a5303aa77dc27d11d47bbbe0ddb8b55eeceaeb61de2ec5efe7e2a5 --shard=0 --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=4592 --metrics-logging=true --store=true --store-sync=true --relay=false --lightpush=true --storenode=$multiaddr_with_id --discv5-bootstrap-node=$enrUri --lightpushnode=$multiaddr_with_id)
docker network connect --ip $node_2_ip waku $container_id2
printf "\nSleeping 1 seconds\n"
sleep 1
printf "\nConnect peers\n"
curl -X POST "http://127.0.0.1:$node_2_rest/admin/v1/peers" -H "Content-Type: application/json" -d "[\"$multiaddr_with_id\"]"
container_id3=$(docker run -d -i -t -p $node_3_rest:$node_3_rest -p 5589:5589 -p 5590:5590 -p 5591:5591 -p 5592:5592 $node_3 --listen-address=0.0.0.0 --rest=true --rest-admin=true --websocket-support=true --log-level=TRACE --rest-relay-cache-capacity=100 --websocket-port=5590 --rest-port=$node_3_rest --tcp-port=5589 --discv5-udp-port=5591 --rest-address=0.0.0.0 --nat=extip:$node_3_ip --peer-exchange=true --discv5-discovery=true --cluster-id=$cluster_id --nodekey=0efae97aa2a5303aa77dc27d11d47bbbe0ddb8b55eeceaeb61de2ec5efe7e2a8 --shard=0 --metrics-server=true --metrics-server-address=0.0.0.0 --metrics-server-port=5592 --metrics-logging=true --store=true --store-sync=true --relay=true --storenode=$multiaddr_with_id --discv5-bootstrap-node=$enrUri)
docker network connect --ip $node_3_ip waku $container_id3
printf "\nSleeping 1 seconds\n"
sleep 1
printf "\nConnect peers\n"
curl -X POST "http://127.0.0.1:$node_3_rest/admin/v1/peers" -H "Content-Type: application/json" -d "[\"$multiaddr_with_id\"]"
printf "\nSubscribe\n"
curl -X POST "http://127.0.0.1:$node_1_rest/relay/v1/subscriptions" -H "Content-Type: application/json" -d "[\"$pubsub_topic\"]"
curl -X POST "http://127.0.0.1:$node_3_rest/relay/v1/subscriptions" -H "Content-Type: application/json" -d "[\"$pubsub_topic\"]"
printf "\nSleeping 1 seconds\n"
sleep 1
printf "\nRelay from NODE 3\n"
curl -X POST "http://127.0.0.1:$node_3_rest/relay/v1/messages/$encoded_pubsub_topic" \
-H "Content-Type: application/json" \
-d '{"payload": "UmVsYXkgd29ya3MhIQ==", "contentTopic": "'"$content_topic"'", "timestamp": '$(date +%s%N)'}'
printf "\nLightpush from NODE 2\n"
curl -v -X POST "http://127.0.0.1:$node_2_rest/lightpush/v1/message" -H "Content-Type: application/json" -d '{"pubsubTopic": "'"$pubsub_topic"'", "message": {"payload": "TGlnaHQgcHVzaCB3b3JrcyEh", "contentTopic": "'"$content_topic"'", "timestamp": '$(date +%s%N)'}}'
printf "\nSleeping 1 seconds\n"
sleep 1
printf "\nCheck message was stored in NODE 1 (relay) with v1 API\n"
response=$(curl -X GET "http://127.0.0.1:$node_1_rest/store/v1/messages?contentTopics=$encoded_content_topic&pageSize=5&ascending=true")
printf "\nResponse: $response\n"
printf "\nCheck message was stored in NODE 3 (relay) with v1 API\n"
response=$(curl -X GET "http://127.0.0.1:$node_3_rest/store/v1/messages?contentTopics=$encoded_content_topic&pageSize=5&ascending=true")
printf "\nResponse: $response\n"
printf "\nSleeping 10 seconds\n"
sleep 10
printf "\nCheck message was stored in NODE 2 (lightpush) with v3 API\n"
response=$(curl -X GET "http://127.0.0.1:$node_2_rest/store/v3/messages?contentTopics=$encoded_content_topic&pageSize=5&ascending=true")
printf "\nResponse: $response\n"

View File

@ -200,21 +200,19 @@ class StepsStore(StepsCommon):
page_size=None,
ascending=None,
store_v="v3",
message_to_check=None,
messages_to_check=None,
**kwargs,
):
if pubsub_topic is None:
pubsub_topic = self.test_pubsub_topic
if message_to_check is None:
message_to_check = self.message
if messages_to_check is None:
messages_to_check = [self.message]
if store_node is None:
store_node = self.store_nodes
elif not isinstance(store_node, list):
store_node = [store_node]
else:
store_node = store_node
for node in store_node:
logger.debug(f"Checking that peer {node.image} can find the stored message")
logger.debug(f"Checking that peer {node.image} can find the stored messages")
self.store_response = self.get_messages_from_store(
node=node,
peer_addr=peer_addr,
@ -232,16 +230,27 @@ class StepsStore(StepsCommon):
)
assert self.store_response.messages, f"Peer {node.image} couldn't find any messages. Actual response: {self.store_response.resp_json}"
assert len(self.store_response.messages) >= 1, "Expected at least 1 message but got none"
store_message_index = -1 # we are looking for the last and most recent message in the store
waku_message = WakuMessage([self.store_response.messages[store_message_index:]])
if store_v == "v1":
waku_message.assert_received_message(message_to_check)
assert len(self.store_response.messages) >= len(
messages_to_check
), f"Expected at least {len(messages_to_check)} messages but got {len(self.store_response.messages)}"
# Determine the range of indices for message comparison
if len(messages_to_check) == 1:
indices = [-1] # Use the last message in the store response for a single message
else:
expected_hash = self.compute_message_hash(pubsub_topic, message_to_check)
assert expected_hash == self.store_response.message_hash(
store_message_index
), f"Message hash returned by store doesn't match the computed message hash {expected_hash}"
indices = range(len(messages_to_check)) # Use corresponding indices for multiple messages
# Iterate through messages_to_check and their respective indices
for idx, message_to_check in zip(indices, messages_to_check):
if store_v == "v1":
waku_message = WakuMessage([self.store_response.messages[idx]])
waku_message.assert_received_message(message_to_check)
else:
expected_hash = self.compute_message_hash(pubsub_topic, message_to_check)
actual_hash = self.store_response.message_hash(idx)
assert (
expected_hash == actual_hash
), f"Message hash at index {idx} returned by store doesn't match the computed message hash {expected_hash}. Actual hash: {actual_hash}"
@allure.step
def check_store_returns_empty_response(self, pubsub_topic=None):

View File

@ -178,7 +178,6 @@ LOG_ERROR_KEYWORDS = [
"segfault",
"corrupt",
"terminated",
"oom",
"unhandled",
"stacktrace",
"deadlock",

View File

@ -147,7 +147,7 @@ class TestE2E(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
message = self.create_message()
self.node1.send_light_push_message(self.create_payload(message=message))
delay(1)
self.check_published_message_is_stored(page_size=50, ascending="true", store_node=self.node3, message_to_check=message)
self.check_published_message_is_stored(page_size=50, ascending="true", store_node=self.node3, messages_to_check=[message])
def test_chain_of_relay_nodes(self):
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")

View File

@ -18,9 +18,9 @@ class TestEphemeral(StepsStore):
def test_message_with_both_ephemeral_true_and_false(self):
self.publish_message(message=self.create_message(ephemeral=True))
stored = self.publish_message(message=self.create_message(ephemeral=False))
self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored)
self.check_published_message_is_stored(page_size=5, ascending="true", messages_to_check=[stored])
assert len(self.store_response.messages) == 1
stored = self.publish_message(message=self.create_message(ephemeral=False))
self.publish_message(message=self.create_message(ephemeral=True))
self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored)
self.check_published_message_is_stored(page_size=5, ascending="true", messages_to_check=[stored])
assert len(self.store_response.messages) == 2

View File

View File

@ -0,0 +1,488 @@
import pytest
from src.env_vars import NODE_1, NODE_2
from src.libs.common import delay, to_base64
from src.libs.custom_logger import get_custom_logger
from src.node.store_response import StoreResponse
from src.node.waku_node import WakuNode
from src.steps.store import StepsStore
logger = get_custom_logger(__name__)
"""
In those tests we aim to combine multiple protocols/node types and create a more end-to-end scenario
"""
@pytest.mark.skipif("go-waku" in (NODE_1 + NODE_2), reason="Test works only with nwaku")
class TestStoreSync(StepsStore):
@pytest.fixture(scope="function", autouse=True)
def nodes(self):
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
self.node2 = WakuNode(NODE_1, f"node2_{self.test_id}")
self.node3 = WakuNode(NODE_1, f"node3_{self.test_id}")
self.num_messages = 10
def test_sync_nodes_are_relay(self):
self.node1.start(store="true", relay="true")
self.node2.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
delay(2) # wait for the sync to finish
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
node1_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
node2_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
node3_message = len(self.store_response.messages)
assert (
node1_message == node2_message == node3_message == self.num_messages
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
def test_sync_nodes_have_store_true(self):
self.node1.start(store="true", relay="true")
self.node2.start(store="true", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(store="true", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
node1_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
node2_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
node3_message = len(self.store_response.messages)
assert (
node1_message == node2_message == node3_message == self.num_messages
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
def test_sync_nodes_are_not_relay_and_have_storenode_set(self):
self.node1.start(store="true", relay="true")
self.node2.start(
store="false",
store_sync="true",
relay="false",
storenode=self.node1.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node1.get_enr_uri(),
)
self.node3.start(
store="false",
store_sync="true",
relay="false",
storenode=self.node1.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node2.get_enr_uri(),
)
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
node1_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
node2_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
node3_message = len(self.store_response.messages)
assert (
node1_message == node2_message == node3_message == self.num_messages
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
def test_sync_messages_received_via_lightpush(self):
self.node1.start(store="true", store_sync="true", relay="true", lightpush="true")
self.node2.start(
store="true",
store_sync="true",
relay="true",
lightpush="true",
discv5_bootstrap_node=self.node1.get_enr_uri(),
lightpushnode=self.node1.get_multiaddr_with_id(),
)
self.node3.start(
store="true",
store_sync="true",
relay="true",
storenode=self.node2.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node2.get_enr_uri(),
)
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
message_list = [self.publish_message(sender=self.node1, via="lightpush") for _ in range(self.num_messages)]
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
node1_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
node2_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
node3_message = len(self.store_response.messages)
assert (
node1_message == node2_message == node3_message == self.num_messages
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
def test_check_sync_when_2_nodes_publish(self):
self.node1.start(store="true", store_sync="true", relay="true")
self.node2.start(
store="true",
store_sync="true",
relay="true",
storenode=self.node1.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node1.get_enr_uri(),
)
self.node3.start(
store="false",
store_sync="true",
relay="false",
storenode=self.node2.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node2.get_enr_uri(),
)
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2)
node1_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2)
node2_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2)
node3_message = len(self.store_response.messages)
assert (
node1_message == node2_message == node3_message == self.num_messages * 2
), f"Store messages are not equal to each other or not equal to {self.num_messages * 2}"
def test_check_sync_when_all_3_nodes_publish(self):
self.node1.start(store="true", store_sync="true", relay="true")
self.node2.start(
store="true",
store_sync="true",
relay="true",
storenode=self.node1.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node1.get_enr_uri(),
)
self.node3.start(
store="false",
store_sync="true",
relay="true",
storenode=self.node2.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node2.get_enr_uri(),
)
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
ml3 = [self.publish_message(sender=self.node3, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3)
node1_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3)
node2_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3)
node3_message = len(self.store_response.messages)
assert (
node1_message == node2_message == node3_message == self.num_messages * 3
), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}"
#########################################################
def test_sync_with_one_node_with_delayed_start(self):
self.node1.start(store="true", store_sync="true", relay="true")
self.node2.start(
store="true",
store_sync="true",
relay="true",
storenode=self.node1.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node1.get_enr_uri(),
)
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
# start the 3rd node
self.node3.start(
store="false",
store_sync="true",
relay="true",
storenode=self.node2.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node2.get_enr_uri(),
)
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
delay(1) # wait for the sync to finish
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
node1_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
node2_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
node3_message = len(self.store_response.messages)
assert (
node1_message == node2_message == node3_message == self.num_messages
), f"Store messages are not equal to each other or not equal to {self.num_messages}"
def test_sync_with_nodes_restart__case1(self):
self.node1.start(store="true", store_sync="true", relay="true")
self.node2.start(
store="true",
store_sync="true",
relay="true",
storenode=self.node1.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node1.get_enr_uri(),
)
self.node3.start(
store="false",
store_sync="true",
relay="true",
storenode=self.node2.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node2.get_enr_uri(),
)
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
ml3 = [self.publish_message(sender=self.node3, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
self.node1.restart()
self.node2.restart()
self.node3.restart()
delay(2) # wait for the sync to finish
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3)
node1_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3)
node2_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3)
node3_message = len(self.store_response.messages)
assert (
node1_message == node2_message == node3_message == self.num_messages * 3
), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}"
def test_sync_with_nodes_restart__case2(self):
self.node1.start(store="true", relay="true")
self.node2.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
ml1 = [self.publish_message(sender=self.node1, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
ml2 = [self.publish_message(sender=self.node2, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
ml3 = [self.publish_message(sender=self.node3, via="relay", message_propagation_delay=0.01) for _ in range(self.num_messages)]
self.node2.restart()
delay(5) # wait for the sync to finish
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3)
node1_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3)
node2_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3)
node3_message = len(self.store_response.messages)
assert (
node1_message == node2_message == node3_message == self.num_messages * 3
), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}"
def test_high_message_volume_sync(self):
self.node1.start(store="true", store_sync="true", relay="true")
self.node2.start(
store="true",
store_sync="true",
relay="true",
storenode=self.node1.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node1.get_enr_uri(),
)
self.node3.start(
store="false",
store_sync="true",
relay="true",
storenode=self.node2.get_multiaddr_with_id(),
discv5_bootstrap_node=self.node2.get_enr_uri(),
)
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
expected_message_hash_list = []
for _ in range(500): # total 1500 messages
messages = [self.create_message() for _ in range(3)]
for i, node in enumerate([self.node1, self.node2, self.node3]):
self.publish_message(sender=node, via="relay", message=messages[i], message_propagation_delay=0.01)
expected_message_hash_list.extend([self.compute_message_hash(self.test_pubsub_topic, msg) for msg in messages])
delay(5) # wait for the sync to finish
for node in [self.node1, self.node2, self.node3]:
store_response = StoreResponse({"paginationCursor": "", "pagination_cursor": ""}, node)
response_message_hash_list = []
while store_response.pagination_cursor is not None:
cursor = store_response.pagination_cursor
store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor)
for index in range(len(store_response.messages)):
response_message_hash_list.append(store_response.message_hash(index))
assert len(expected_message_hash_list) == len(response_message_hash_list), "Message count mismatch"
assert expected_message_hash_list == response_message_hash_list, "Message hash mismatch"
def test_large_message_payload_sync(self):
self.node1.start(store="true", relay="true")
self.node2.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(store="false", store_sync="true", relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
payload_length = 1024 * 100 # after encoding to base64 this will be close to 150KB
ml1 = [
self.publish_message(
sender=self.node1, via="relay", message=self.create_message(payload=to_base64("a" * (payload_length))), message_propagation_delay=0.01
)
for _ in range(self.num_messages)
]
ml2 = [
self.publish_message(
sender=self.node2, via="relay", message=self.create_message(payload=to_base64("a" * (payload_length))), message_propagation_delay=0.01
)
for _ in range(self.num_messages)
]
ml3 = [
self.publish_message(
sender=self.node3, via="relay", message=self.create_message(payload=to_base64("a" * (payload_length))), message_propagation_delay=0.01
)
for _ in range(self.num_messages)
]
delay(10) # wait for the sync to finish
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=ml1 + ml2 + ml3)
node1_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=ml1 + ml2 + ml3)
node2_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=ml1 + ml2 + ml3)
node3_message = len(self.store_response.messages)
assert (
node1_message == node2_message == node3_message == self.num_messages * 3
), f"Store messages are not equal to each other or not equal to {self.num_messages * 3}"
def test_sync_flags(self):
self.node1.start(
store="true",
store_sync="true",
store_sync_interval=1,
store_sync_range=10,
store_sync_relay_jitter=1,
store_sync_max_payload_size=1000,
relay="true",
)
self.node2.start(
store="false",
store_sync="true",
store_sync_interval=1,
store_sync_range=10,
store_sync_relay_jitter=1,
store_sync_max_payload_size=1000,
relay="true",
discv5_bootstrap_node=self.node1.get_enr_uri(),
)
self.node3.start(
store="false",
store_sync="true",
store_sync_interval=1,
store_sync_range=10,
store_sync_relay_jitter=1,
store_sync_max_payload_size=1000,
relay="true",
discv5_bootstrap_node=self.node2.get_enr_uri(),
)
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
self.add_node_peer(self.node3, [self.node2.get_multiaddr_with_id()])
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.node3.set_relay_subscriptions([self.test_pubsub_topic])
message_list = [self.publish_message(sender=self.node1, via="relay") for _ in range(self.num_messages)]
delay(2) # wait for the sync to finish
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node1, messages_to_check=message_list)
node1_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node2, messages_to_check=message_list)
node2_message = len(self.store_response.messages)
self.check_published_message_is_stored(page_size=100, ascending="true", store_node=self.node3, messages_to_check=message_list)
node3_message = len(self.store_response.messages)
assert (
node1_message == node2_message == node3_message == self.num_messages
), f"Store messages are not equal to each other or not equal to {self.num_messages}"