mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
fix: waku sync 2.0 codecs ENR support (#3326)
This commit is contained in:
parent
91e5c7bc13
commit
bf1a0dc42c
@ -1,188 +0,0 @@
|
|||||||
{.used.}
|
|
||||||
|
|
||||||
import std/net, testutils/unittests, chronos, libp2p/crypto/crypto
|
|
||||||
|
|
||||||
import
|
|
||||||
../../waku/
|
|
||||||
[node/waku_node, node/peer_manager, waku_core, waku_store, waku_archive, waku_sync],
|
|
||||||
../waku_store/store_utils,
|
|
||||||
../waku_archive/archive_utils,
|
|
||||||
../testlib/[wakucore, wakunode, testasync]
|
|
||||||
|
|
||||||
suite "Store Sync - End to End":
|
|
||||||
var server {.threadvar.}: WakuNode
|
|
||||||
var client {.threadvar.}: WakuNode
|
|
||||||
|
|
||||||
asyncSetup:
|
|
||||||
let timeOrigin = now()
|
|
||||||
|
|
||||||
let messages =
|
|
||||||
@[
|
|
||||||
fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)),
|
|
||||||
fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)),
|
|
||||||
fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)),
|
|
||||||
fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)),
|
|
||||||
fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)),
|
|
||||||
fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)),
|
|
||||||
fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)),
|
|
||||||
fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)),
|
|
||||||
fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)),
|
|
||||||
fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)),
|
|
||||||
]
|
|
||||||
|
|
||||||
let
|
|
||||||
serverKey = generateSecp256k1Key()
|
|
||||||
clientKey = generateSecp256k1Key()
|
|
||||||
|
|
||||||
server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
|
|
||||||
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))
|
|
||||||
|
|
||||||
let serverArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
|
|
||||||
let clientArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
|
|
||||||
|
|
||||||
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
|
|
||||||
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)
|
|
||||||
|
|
||||||
assert mountServerArchiveRes.isOk()
|
|
||||||
assert mountClientArchiveRes.isOk()
|
|
||||||
|
|
||||||
await server.mountStore()
|
|
||||||
await client.mountStore()
|
|
||||||
|
|
||||||
client.mountStoreClient()
|
|
||||||
server.mountStoreClient()
|
|
||||||
|
|
||||||
let mountServerSync = await server.mountWakuSync(
|
|
||||||
maxFrameSize = 0, syncInterval = 1.hours, relayJitter = 0.seconds
|
|
||||||
)
|
|
||||||
let mountClientSync = await client.mountWakuSync(
|
|
||||||
maxFrameSize = 0, syncInterval = 2.milliseconds, relayJitter = 0.seconds
|
|
||||||
)
|
|
||||||
|
|
||||||
assert mountServerSync.isOk(), mountServerSync.error
|
|
||||||
assert mountClientSync.isOk(), mountClientSync.error
|
|
||||||
|
|
||||||
# messages are retreived when mounting Waku sync
|
|
||||||
# but based on interval so this is needed for client only
|
|
||||||
for msg in messages:
|
|
||||||
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
|
|
||||||
|
|
||||||
await allFutures(server.start(), client.start())
|
|
||||||
|
|
||||||
let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
|
|
||||||
let clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo()
|
|
||||||
|
|
||||||
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec)
|
|
||||||
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec)
|
|
||||||
|
|
||||||
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec)
|
|
||||||
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuStoreCodec)
|
|
||||||
|
|
||||||
asyncTeardown:
|
|
||||||
# prevent premature channel shutdown
|
|
||||||
await sleepAsync(10.milliseconds)
|
|
||||||
|
|
||||||
await allFutures(client.stop(), server.stop())
|
|
||||||
|
|
||||||
asyncTest "no message set differences":
|
|
||||||
check:
|
|
||||||
client.wakuSync.storageSize() == server.wakuSync.storageSize()
|
|
||||||
|
|
||||||
await sleepAsync(10.milliseconds)
|
|
||||||
|
|
||||||
check:
|
|
||||||
client.wakuSync.storageSize() == server.wakuSync.storageSize()
|
|
||||||
|
|
||||||
asyncTest "client message set differences":
|
|
||||||
let msg = fakeWakuMessage(@[byte 10])
|
|
||||||
|
|
||||||
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
|
|
||||||
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
|
|
||||||
|
|
||||||
check:
|
|
||||||
client.wakuSync.storageSize() != server.wakuSync.storageSize()
|
|
||||||
|
|
||||||
await sleepAsync(10.milliseconds)
|
|
||||||
|
|
||||||
check:
|
|
||||||
client.wakuSync.storageSize() == server.wakuSync.storageSize()
|
|
||||||
|
|
||||||
asyncTest "server message set differences":
|
|
||||||
let msg = fakeWakuMessage(@[byte 10])
|
|
||||||
|
|
||||||
server.wakuSync.messageIngress(DefaultPubsubTopic, msg)
|
|
||||||
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
|
|
||||||
|
|
||||||
check:
|
|
||||||
client.wakuSync.storageSize() != server.wakuSync.storageSize()
|
|
||||||
|
|
||||||
await sleepAsync(10.milliseconds)
|
|
||||||
|
|
||||||
check:
|
|
||||||
client.wakuSync.storageSize() == server.wakuSync.storageSize()
|
|
||||||
|
|
||||||
suite "Waku Sync - Pruning":
|
|
||||||
var server {.threadvar.}: WakuNode
|
|
||||||
var client {.threadvar.}: WakuNode
|
|
||||||
|
|
||||||
asyncSetup:
|
|
||||||
let
|
|
||||||
serverKey = generateSecp256k1Key()
|
|
||||||
clientKey = generateSecp256k1Key()
|
|
||||||
|
|
||||||
server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
|
|
||||||
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))
|
|
||||||
|
|
||||||
let serverArchiveDriver = newSqliteArchiveDriver()
|
|
||||||
let clientArchiveDriver = newSqliteArchiveDriver()
|
|
||||||
|
|
||||||
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
|
|
||||||
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)
|
|
||||||
|
|
||||||
assert mountServerArchiveRes.isOk()
|
|
||||||
assert mountClientArchiveRes.isOk()
|
|
||||||
|
|
||||||
await server.mountStore()
|
|
||||||
await client.mountStore()
|
|
||||||
|
|
||||||
client.mountStoreClient()
|
|
||||||
server.mountStoreClient()
|
|
||||||
|
|
||||||
let mountServerSync = await server.mountWakuSync(
|
|
||||||
maxFrameSize = 0,
|
|
||||||
relayJitter = 0.seconds,
|
|
||||||
syncRange = 1.hours,
|
|
||||||
syncInterval = 5.minutes,
|
|
||||||
)
|
|
||||||
let mountClientSync = await client.mountWakuSync(
|
|
||||||
maxFrameSize = 0,
|
|
||||||
syncRange = 10.milliseconds,
|
|
||||||
syncInterval = 10.milliseconds,
|
|
||||||
relayJitter = 0.seconds,
|
|
||||||
)
|
|
||||||
|
|
||||||
assert mountServerSync.isOk(), mountServerSync.error
|
|
||||||
assert mountClientSync.isOk(), mountClientSync.error
|
|
||||||
|
|
||||||
await allFutures(server.start(), client.start())
|
|
||||||
|
|
||||||
asyncTeardown:
|
|
||||||
await sleepAsync(10.milliseconds)
|
|
||||||
|
|
||||||
await allFutures(client.stop(), server.stop())
|
|
||||||
|
|
||||||
asyncTest "pruning":
|
|
||||||
for _ in 0 ..< 4:
|
|
||||||
for _ in 0 ..< 10:
|
|
||||||
let msg = fakeWakuMessage()
|
|
||||||
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
|
|
||||||
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
|
|
||||||
|
|
||||||
server.wakuSync.messageIngress(DefaultPubsubTopic, msg)
|
|
||||||
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
|
|
||||||
|
|
||||||
await sleepAsync(10.milliseconds)
|
|
||||||
|
|
||||||
check:
|
|
||||||
client.wakuSync.storageSize() == 10
|
|
||||||
server.wakuSync.storageSize() == 40
|
|
||||||
@ -5,7 +5,6 @@ const
|
|||||||
WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1"
|
WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1"
|
||||||
WakuLightPushCodec* = "/vac/waku/lightpush/3.0.0"
|
WakuLightPushCodec* = "/vac/waku/lightpush/3.0.0"
|
||||||
WakuLegacyLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
WakuLegacyLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
||||||
WakuSyncCodec* = "/vac/waku/sync/1.0.0"
|
|
||||||
WakuReconciliationCodec* = "/vac/waku/reconciliation/1.0.0"
|
WakuReconciliationCodec* = "/vac/waku/reconciliation/1.0.0"
|
||||||
WakuTransferCodec* = "/vac/waku/transfer/1.0.0"
|
WakuTransferCodec* = "/vac/waku/transfer/1.0.0"
|
||||||
WakuMetadataCodec* = "/vac/waku/metadata/1.0.0"
|
WakuMetadataCodec* = "/vac/waku/metadata/1.0.0"
|
||||||
|
|||||||
@ -26,7 +26,7 @@ const capabilityToCodec = {
|
|||||||
Capabilities.Store: WakuStoreCodec,
|
Capabilities.Store: WakuStoreCodec,
|
||||||
Capabilities.Filter: WakuFilterSubscribeCodec,
|
Capabilities.Filter: WakuFilterSubscribeCodec,
|
||||||
Capabilities.Lightpush: WakuLightPushCodec,
|
Capabilities.Lightpush: WakuLightPushCodec,
|
||||||
Capabilities.Sync: WakuSyncCodec,
|
Capabilities.Sync: WakuReconciliationCodec,
|
||||||
}.toTable
|
}.toTable
|
||||||
|
|
||||||
func init*(
|
func init*(
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user