mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-01-08 00:43:07 +00:00
Merge remote-tracking branch 'origin/master' into waku_sync_part2
This commit is contained in:
commit
d37a0648e9
@ -121,6 +121,20 @@ class WakuNode:
|
||||
# "filter-subscription-timeout": "600",
|
||||
}
|
||||
|
||||
store_sync = kwargs.pop("store_sync", "false").lower() == "true"
|
||||
store_sync_range = kwargs.pop("store_sync_range", None)
|
||||
store_sync_interval = kwargs.pop("store_sync_interval", None)
|
||||
|
||||
if store_sync:
|
||||
default_args["store-sync"] = "true"
|
||||
default_args.setdefault("store", "true")
|
||||
|
||||
if store_sync_range:
|
||||
default_args["store-sync-range"] = store_sync_range
|
||||
|
||||
if store_sync_interval:
|
||||
default_args["store-sync-interval"] = store_sync_interval
|
||||
|
||||
if self.is_nwaku():
|
||||
nwaku_args = {
|
||||
"shard": "0",
|
||||
|
||||
@ -2,6 +2,7 @@ import base64
|
||||
import hashlib
|
||||
import inspect
|
||||
from time import time
|
||||
from time import time_ns
|
||||
import allure
|
||||
import pytest
|
||||
from datetime import timedelta, datetime
|
||||
@ -39,7 +40,9 @@ class StepsCommon:
|
||||
|
||||
@allure.step
|
||||
def create_message(self, **kwargs):
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
|
||||
ts_ns = time_ns()
|
||||
ts_ns = int(f"{ts_ns:019d}")
|
||||
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": ts_ns}
|
||||
message.update(kwargs)
|
||||
return message
|
||||
|
||||
|
||||
@ -225,6 +225,7 @@ class StepsStore(StepsCommon):
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
logger.debug(f"messages length is {len(self.store_response.messages)}")
|
||||
assert self.store_response.messages, f"Peer {node.image} couldn't find any messages. Actual response: {self.store_response.resp_json}"
|
||||
assert len(self.store_response.messages) >= len(
|
||||
messages_to_check
|
||||
|
||||
@ -993,3 +993,99 @@ class TestStoreSync(StepsStore):
|
||||
resp3 = self.get_messages_from_store(self.node3, page_size=200, cursor="", ascending="true", peer_id="")
|
||||
logger.debug("Node3 store has %d messages expected %d", len(resp3.messages), msgs_to_publish)
|
||||
assert len(resp3.messages) == msgs_to_publish, f"Node3 store mismatch: expected {msgs_to_publish}, " f"got {len(resp3.messages)}"
|
||||
|
||||
def test_continuous_store_sync(self):
|
||||
msgs_per_round = 30
|
||||
rounds = 3
|
||||
sleep_between_rounds = 30
|
||||
publish_delay = 0.01
|
||||
sync_interval = 6
|
||||
sync_range = 600
|
||||
jitter = 0
|
||||
|
||||
self.node1.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
relay="true",
|
||||
dns_discovery="false",
|
||||
)
|
||||
|
||||
self.node2.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=sync_interval,
|
||||
store_sync_range=sync_range,
|
||||
store_sync_relay_jitter=jitter,
|
||||
relay="false",
|
||||
dns_discovery="false",
|
||||
)
|
||||
|
||||
self.add_node_peer(self.node2, [self.node1.get_multiaddr_with_id()])
|
||||
|
||||
total_published = 0
|
||||
for _ in range(rounds):
|
||||
for _ in range(msgs_per_round):
|
||||
self.publish_message(sender=self.node1, via="relay")
|
||||
total_published += 1
|
||||
time.sleep(publish_delay)
|
||||
|
||||
time.sleep(sync_interval * 2)
|
||||
|
||||
resp = self.get_messages_from_store(
|
||||
self.node2,
|
||||
page_size=100,
|
||||
cursor="",
|
||||
ascending="true",
|
||||
peer_id="",
|
||||
)
|
||||
logger.debug(f"Node-2 store has {len(resp.messages)}/{total_published} messages")
|
||||
assert len(resp.messages) == total_published, f"expected {total_published}, got {len(resp.messages)}"
|
||||
|
||||
time.sleep(sleep_between_rounds)
|
||||
|
||||
def test_store_sync_high_jitter_stress(self):
|
||||
sync_interval = 10
|
||||
sync_range = 120
|
||||
jitter = 90
|
||||
msgs_per_node = 50
|
||||
message_delay = 0.0
|
||||
page_size = 100
|
||||
|
||||
nodes = [self.node1, self.node2, self.node3]
|
||||
|
||||
for n in nodes:
|
||||
n.start(
|
||||
store="true",
|
||||
store_sync="true",
|
||||
store_sync_interval=sync_interval,
|
||||
store_sync_range=sync_range,
|
||||
store_sync_relay_jitter=jitter,
|
||||
relay="true",
|
||||
dns_discovery="false",
|
||||
)
|
||||
n.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
|
||||
for i, a in enumerate(nodes):
|
||||
for b in nodes[i + 1 :]:
|
||||
self.add_node_peer(a, [b.get_multiaddr_with_id()])
|
||||
self.add_node_peer(b, [a.get_multiaddr_with_id()])
|
||||
|
||||
expected_hashes = []
|
||||
for _ in range(msgs_per_node):
|
||||
msgs = [self.create_message() for _ in nodes]
|
||||
for node, msg in zip(nodes, msgs):
|
||||
self.publish_message(
|
||||
sender=node,
|
||||
via="relay",
|
||||
message=msg,
|
||||
message_propagation_delay=message_delay,
|
||||
)
|
||||
expected_hashes.append(self.compute_message_hash(self.test_pubsub_topic, msg, hash_type="hex"))
|
||||
|
||||
delay(120)
|
||||
|
||||
for node in nodes:
|
||||
store_resp = self.get_messages_from_store(node, page_size=page_size, ascending="true")
|
||||
retrieved_hashes = [store_resp.message_hash(i) for i in range(len(store_resp.messages))]
|
||||
assert len(retrieved_hashes) == len(expected_hashes), " message count mismatch"
|
||||
assert retrieved_hashes == expected_hashes, "{ message hash mismatch"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user