mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-14 17:04:53 +00:00
2cc86c51da
* feat: Waku Sync Protocol * feat: state machine (#2656) * feat: pruning storage mehcanism (#2673) * feat: message transfer mechanism & tests (#2688) * update docker files * added ENR filed for sync & misc. fixes * adding new sync range param & fixes --------- Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored-by: Prem Chaitanya Prathi <chaitanyaprem@gmail.com>
375 lines
12 KiB
Nim
375 lines
12 KiB
Nim
{.used.}
|
|
|
|
import
|
|
std/[options, sets],
|
|
testutils/unittests,
|
|
chronos,
|
|
chronicles,
|
|
libp2p/crypto/crypto,
|
|
stew/byteutils,
|
|
std/random
|
|
|
|
import
|
|
../../waku/[
|
|
node/peer_manager,
|
|
waku_core,
|
|
waku_core/message/digest,
|
|
waku_sync,
|
|
waku_sync/raw_bindings,
|
|
],
|
|
../testlib/[wakucore, testasync],
|
|
./sync_utils
|
|
|
|
random.randomize()
|
|
|
|
suite "Waku Sync":
|
|
var serverSwitch {.threadvar.}: Switch
|
|
var clientSwitch {.threadvar.}: Switch
|
|
|
|
var server {.threadvar.}: WakuSync
|
|
var client {.threadvar.}: WakuSync
|
|
|
|
var serverPeerInfo {.threadvar.}: Option[RemotePeerInfo]
|
|
|
|
asyncSetup:
|
|
serverSwitch = newTestSwitch()
|
|
clientSwitch = newTestSwitch()
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
server = await newTestWakuSync(serverSwitch)
|
|
client = await newTestWakuSync(clientSwitch)
|
|
|
|
serverPeerInfo = some(serverSwitch.peerInfo.toRemotePeerInfo())
|
|
|
|
asyncTeardown:
|
|
await sleepAsync(10.milliseconds)
|
|
|
|
await allFutures(server.stop(), client.stop())
|
|
await allFutures(serverSwitch.stop(), clientSwitch.stop())
|
|
|
|
suite "Protocol":
|
|
asyncTest "sync 2 nodes both empty":
|
|
let hashes = await client.storeSynchronization(serverPeerInfo)
|
|
assert hashes.isOk(), hashes.error
|
|
check:
|
|
hashes.value[0].len == 0
|
|
|
|
asyncTest "sync 2 nodes empty client full server":
|
|
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
|
|
server.messageIngress(DefaultPubsubTopic, msg1)
|
|
server.messageIngress(DefaultPubsubTopic, msg2)
|
|
server.messageIngress(DefaultPubsubTopic, msg3)
|
|
|
|
var hashes = await client.storeSynchronization(serverPeerInfo)
|
|
|
|
assert hashes.isOk(), hashes.error
|
|
check:
|
|
hashes.value[0].len == 3
|
|
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) in hashes.value[0]
|
|
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) in hashes.value[0]
|
|
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) in hashes.value[0]
|
|
|
|
asyncTest "sync 2 nodes full client empty server":
|
|
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
|
|
client.messageIngress(DefaultPubsubTopic, msg1)
|
|
client.messageIngress(DefaultPubsubTopic, msg2)
|
|
client.messageIngress(DefaultPubsubTopic, msg3)
|
|
|
|
var hashes = await client.storeSynchronization(serverPeerInfo)
|
|
assert hashes.isOk(), hashes.error
|
|
check:
|
|
hashes.value[0].len == 0
|
|
|
|
asyncTest "sync 2 nodes different hashes":
|
|
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
|
|
server.messageIngress(DefaultPubsubTopic, msg1)
|
|
client.messageIngress(DefaultPubsubTopic, msg1)
|
|
server.messageIngress(DefaultPubsubTopic, msg2)
|
|
|
|
var syncRes = await client.storeSynchronization(serverPeerInfo)
|
|
|
|
check:
|
|
syncRes.isOk()
|
|
|
|
var hashes = syncRes.get()
|
|
|
|
check:
|
|
hashes[0].len == 1
|
|
hashes[0][0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
|
|
|
|
#Assuming message is fetched from peer
|
|
client.messageIngress(DefaultPubsubTopic, msg2)
|
|
|
|
syncRes = await client.storeSynchronization(serverPeerInfo)
|
|
|
|
check:
|
|
syncRes.isOk()
|
|
|
|
hashes = syncRes.get()
|
|
|
|
check:
|
|
hashes[0].len == 0
|
|
|
|
asyncTest "sync 2 nodes same hashes":
|
|
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
|
|
server.messageIngress(DefaultPubsubTopic, msg1)
|
|
client.messageIngress(DefaultPubsubTopic, msg1)
|
|
server.messageIngress(DefaultPubsubTopic, msg2)
|
|
client.messageIngress(DefaultPubsubTopic, msg2)
|
|
|
|
let hashes = await client.storeSynchronization(serverPeerInfo)
|
|
assert hashes.isOk(), $hashes.error
|
|
check:
|
|
hashes.value[0].len == 0
|
|
|
|
asyncTest "sync 2 nodes 100K msgs":
|
|
var i = 0
|
|
let msgCount = 100000
|
|
var diffIndex = rand(msgCount)
|
|
var diffMsg: WakuMessage
|
|
while i < msgCount:
|
|
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
if i != diffIndex:
|
|
client.messageIngress(DefaultPubsubTopic, msg)
|
|
else:
|
|
diffMsg = msg
|
|
server.messageIngress(DefaultPubsubTopic, msg)
|
|
i += 1
|
|
|
|
let hashes = await client.storeSynchronization(serverPeerInfo)
|
|
assert hashes.isOk(), $hashes.error
|
|
|
|
check:
|
|
hashes.value[0].len == 1
|
|
hashes.value[0][0] == computeMessageHash(DefaultPubsubTopic, diffMsg)
|
|
|
|
asyncTest "sync 2 nodes 100K msgs 10K diffs":
|
|
var i = 0
|
|
let msgCount = 100000
|
|
var diffCount = 10000
|
|
|
|
var diffMsgHashes: seq[WakuMessageHash]
|
|
var randIndexes: seq[int]
|
|
while i < diffCount:
|
|
let randInt = rand(msgCount)
|
|
if randInt in randIndexes:
|
|
continue
|
|
randIndexes.add(randInt)
|
|
i += 1
|
|
|
|
i = 0
|
|
var tmpDiffCnt = diffCount
|
|
while i < msgCount:
|
|
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
if tmpDiffCnt > 0 and i in randIndexes:
|
|
diffMsgHashes.add(computeMessageHash(DefaultPubsubTopic, msg))
|
|
tmpDiffCnt = tmpDiffCnt - 1
|
|
else:
|
|
client.messageIngress(DefaultPubsubTopic, msg)
|
|
|
|
server.messageIngress(DefaultPubsubTopic, msg)
|
|
i += 1
|
|
|
|
let hashes = await client.storeSynchronization(serverPeerInfo)
|
|
assert hashes.isOk(), $hashes.error
|
|
|
|
check:
|
|
hashes.value[0].len == diffCount
|
|
toHashSet(hashes.value[0]) == toHashSet(diffMsgHashes)
|
|
|
|
asyncTest "sync 3 nodes 2 client 1 server":
|
|
## Setup
|
|
let client2Switch = newTestSwitch()
|
|
await client2Switch.start()
|
|
let client2 = await newTestWakuSync(client2Switch)
|
|
|
|
let msgCount = 10000
|
|
var i = 0
|
|
|
|
while i < msgCount:
|
|
i += 1
|
|
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
if i mod 2 == 0:
|
|
client2.messageIngress(DefaultPubsubTopic, msg)
|
|
else:
|
|
client.messageIngress(DefaultPubsubTopic, msg)
|
|
server.messageIngress(DefaultPubsubTopic, msg)
|
|
|
|
let fut1 = client.storeSynchronization(serverPeerInfo)
|
|
let fut2 = client2.storeSynchronization(serverPeerInfo)
|
|
waitFor allFutures(fut1, fut2)
|
|
|
|
let hashes1 = fut1.read()
|
|
let hashes2 = fut2.read()
|
|
|
|
assert hashes1.isOk(), $hashes1.error
|
|
assert hashes2.isOk(), $hashes2.error
|
|
|
|
check:
|
|
hashes1.value[0].len == int(msgCount / 2)
|
|
hashes2.value[0].len == int(msgCount / 2)
|
|
|
|
await client2.stop()
|
|
await client2Switch.stop()
|
|
|
|
asyncTest "sync 6 nodes varying sync diffs":
|
|
## Setup
|
|
let
|
|
client2Switch = newTestSwitch()
|
|
client3Switch = newTestSwitch()
|
|
client4Switch = newTestSwitch()
|
|
client5Switch = newTestSwitch()
|
|
|
|
await allFutures(
|
|
client2Switch.start(),
|
|
client3Switch.start(),
|
|
client4Switch.start(),
|
|
client5Switch.start(),
|
|
)
|
|
|
|
let
|
|
client2 = await newTestWakuSync(client2Switch)
|
|
client3 = await newTestWakuSync(client3Switch)
|
|
client4 = await newTestWakuSync(client4Switch)
|
|
client5 = await newTestWakuSync(client5Switch)
|
|
|
|
let msgCount = 100000
|
|
var i = 0
|
|
|
|
while i < msgCount:
|
|
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
if i < msgCount - 1:
|
|
client.messageIngress(DefaultPubsubTopic, msg)
|
|
if i < msgCount - 10:
|
|
client2.messageIngress(DefaultPubsubTopic, msg)
|
|
if i < msgCount - 100:
|
|
client3.messageIngress(DefaultPubsubTopic, msg)
|
|
if i < msgCount - 1000:
|
|
client4.messageIngress(DefaultPubsubTopic, msg)
|
|
if i < msgCount - 10000:
|
|
client5.messageIngress(DefaultPubsubTopic, msg)
|
|
server.messageIngress(DefaultPubsubTopic, msg)
|
|
i += 1
|
|
|
|
var timeBefore = getNowInNanosecondTime()
|
|
let hashes1 = await client.storeSynchronization(serverPeerInfo)
|
|
var timeAfter = getNowInNanosecondTime()
|
|
var syncTime = (timeAfter - timeBefore)
|
|
debug "sync time in seconds", msgsTotal = msgCount, diff = 1, syncTime = syncTime
|
|
assert hashes1.isOk(), $hashes1.error
|
|
check:
|
|
hashes1.value[0].len == 1
|
|
|
|
timeBefore = getNowInNanosecondTime()
|
|
let hashes2 = await client2.storeSynchronization(serverPeerInfo)
|
|
timeAfter = getNowInNanosecondTime()
|
|
syncTime = (timeAfter - timeBefore)
|
|
debug "sync time in seconds", msgsTotal = msgCount, diff = 10, syncTime = syncTime
|
|
assert hashes2.isOk(), $hashes2.error
|
|
check:
|
|
hashes2.value[0].len == 10
|
|
|
|
timeBefore = getNowInNanosecondTime()
|
|
let hashes3 = await client3.storeSynchronization(serverPeerInfo)
|
|
timeAfter = getNowInNanosecondTime()
|
|
syncTime = (timeAfter - timeBefore)
|
|
debug "sync time in seconds",
|
|
msgsTotal = msgCount, diff = 100, syncTime = syncTime
|
|
assert hashes3.isOk(), $hashes3.error
|
|
check:
|
|
hashes3.value[0].len == 100
|
|
|
|
timeBefore = getNowInNanosecondTime()
|
|
let hashes4 = await client4.storeSynchronization(serverPeerInfo)
|
|
timeAfter = getNowInNanosecondTime()
|
|
syncTime = (timeAfter - timeBefore)
|
|
debug "sync time in seconds",
|
|
msgsTotal = msgCount, diff = 1000, syncTime = syncTime
|
|
assert hashes4.isOk(), $hashes4.error
|
|
check:
|
|
hashes4.value[0].len == 1000
|
|
|
|
timeBefore = getNowInNanosecondTime()
|
|
let hashes5 = await client5.storeSynchronization(serverPeerInfo)
|
|
timeAfter = getNowInNanosecondTime()
|
|
syncTime = (timeAfter - timeBefore)
|
|
debug "sync time in seconds",
|
|
msgsTotal = msgCount, diff = 10000, syncTime = syncTime
|
|
assert hashes5.isOk(), $hashes5.error
|
|
check:
|
|
hashes5.value[0].len == 10000
|
|
|
|
await allFutures(client2.stop(), client3.stop(), client4.stop(), client5.stop())
|
|
await allFutures(
|
|
client2Switch.stop(),
|
|
client3Switch.stop(),
|
|
client4Switch.stop(),
|
|
client5Switch.stop(),
|
|
)
|
|
|
|
asyncTest "sync 3 nodes cyclic":
|
|
let
|
|
node1Switch = newTestSwitch()
|
|
node2Switch = newTestSwitch()
|
|
node3Switch = newTestSwitch()
|
|
|
|
await allFutures(node1Switch.start(), node2Switch.start(), node3Switch.start())
|
|
|
|
let node1PeerInfo = some(node1Switch.peerInfo.toRemotePeerInfo())
|
|
let node2PeerInfo = some(node2Switch.peerInfo.toRemotePeerInfo())
|
|
let node3PeerInfo = some(node3Switch.peerInfo.toRemotePeerInfo())
|
|
|
|
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
let hash1 = computeMessageHash(DefaultPubsubTopic, msg1)
|
|
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
let hash2 = computeMessageHash(DefaultPubsubTopic, msg2)
|
|
let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic)
|
|
let hash3 = computeMessageHash(DefaultPubsubTopic, msg3)
|
|
|
|
let
|
|
node1 = await newTestWakuSync(node1Switch)
|
|
node2 = await newTestWakuSync(node2Switch)
|
|
node3 = await newTestWakuSync(node3Switch)
|
|
|
|
node1.messageIngress(DefaultPubsubTopic, msg1)
|
|
node2.messageIngress(DefaultPubsubTopic, msg1)
|
|
node2.messageIngress(DefaultPubsubTopic, msg2)
|
|
node3.messageIngress(DefaultPubsubTopic, msg3)
|
|
|
|
let f1 = node1.storeSynchronization(node2PeerInfo)
|
|
let f2 = node2.storeSynchronization(node3PeerInfo)
|
|
let f3 = node3.storeSynchronization(node1PeerInfo)
|
|
|
|
waitFor allFutures(f1, f2, f3)
|
|
|
|
let hashes1 = f1.read()
|
|
let hashes2 = f2.read()
|
|
let hashes3 = f3.read()
|
|
|
|
assert hashes1.isOk(), hashes1.error
|
|
assert hashes2.isOk(), hashes2.error
|
|
assert hashes3.isOk(), hashes3.error
|
|
|
|
check:
|
|
hashes1.get()[0].len == 1
|
|
hashes2.get()[0].len == 1
|
|
hashes3.get()[0].len == 1
|
|
|
|
hashes1.get()[0][0] == hash2
|
|
hashes2.get()[0][0] == hash3
|
|
hashes3.get()[0][0] == hash1
|
|
|
|
await allFutures(node1.stop(), node2.stop(), node3.stop())
|
|
await allFutures(node1Switch.stop(), node2Switch.stop(), node3Switch.stop())
|