From d41179e562e97109b95eb0cabd7967bca78581a3 Mon Sep 17 00:00:00 2001 From: AYAHASSAN287 <49167455+AYAHASSAN287@users.noreply.github.com> Date: Tue, 17 Jun 2025 17:37:25 +0300 Subject: [PATCH] test: Waku sync tests part2 (#3397) * Revert "Revert "Add finger print tests"" This reverts commit 36066311f91da31ca69fef3fa327d5e7fda7e50c. * Add state transition test * Add last test for state transition * Add new tests to transfer protocol * Add stree test scenarios * Add stress tests and edge scenarios * Add test outside sync window * Add edge tests * Add last corner test * Apply linters on files --- tests/waku_store_sync/sync_utils.nim | 2 + tests/waku_store_sync/test_protocol.nim | 444 ++++++++++++++++-- .../waku_store_sync/test_state_transition.nim | 249 ++++++++++ 3 files changed, 660 insertions(+), 35 deletions(-) create mode 100644 tests/waku_store_sync/test_state_transition.nim diff --git a/tests/waku_store_sync/sync_utils.nim b/tests/waku_store_sync/sync_utils.nim index e7fd82b57..d5cb601a2 100644 --- a/tests/waku_store_sync/sync_utils.nim +++ b/tests/waku_store_sync/sync_utils.nim @@ -26,6 +26,7 @@ proc newTestWakuRecon*( wantsTx: AsyncQueue[PeerId], needsTx: AsyncQueue[(PeerId, Fingerprint)], cluster: uint16 = 1, + syncRange: timer.Duration = DefaultSyncRange, shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7], ): Future[SyncReconciliation] {.async.} = let peerManager = PeerManager.new(switch) @@ -36,6 +37,7 @@ proc newTestWakuRecon*( peerManager = peerManager, wakuArchive = nil, relayJitter = 0.seconds, + syncRange = syncRange, idsRx = idsRx, localWantsTx = wantsTx, remoteNeedsTx = needsTx, diff --git a/tests/waku_store_sync/test_protocol.nim b/tests/waku_store_sync/test_protocol.nim index efdd6a885..ecb50250b 100644 --- a/tests/waku_store_sync/test_protocol.nim +++ b/tests/waku_store_sync/test_protocol.nim @@ -1,8 +1,12 @@ {.used.} import - std/[options, sets, random, math], testutils/unittests, chronos, libp2p/crypto/crypto - + std/[options, sets, random, math, algorithm], + testutils/unittests, + chronos, + libp2p/crypto/crypto +import chronos, chronos/asyncsync +import nimcrypto import ../../waku/[ node/peer_manager, @@ -21,6 +25,15 @@ import ../waku_archive/archive_utils, ./sync_utils +proc collectDiffs*( + chan: var Channel[SyncID], diffCount: int +): HashSet[WakuMessageHash] = + var received: HashSet[WakuMessageHash] + while received.len < diffCount: + let sid = chan.recv() # synchronous receive + received.incl sid.hash + result = received + suite "Waku Sync: reconciliation": var serverSwitch {.threadvar.}: Switch var clientSwitch {.threadvar.}: Switch @@ -234,53 +247,377 @@ suite "Waku Sync: reconciliation": remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == true asyncTest "sync 2 nodes 10K msgs 1K diffs": - let msgCount = 10_000 - var diffCount = 1_000 + const + msgCount = 200_000 # total messages on the server + diffCount = 100 # messages initially missing on the client - var diffMsgHashes: HashSet[WakuMessageHash] - var randIndexes: HashSet[int] + ## ── choose which messages will be absent from the client ───────────── + var missingIdx: HashSet[int] + while missingIdx.len < diffCount: + missingIdx.incl rand(0 ..< msgCount) - # Diffs - for i in 0 ..< diffCount: - var randInt = rand(0 ..< msgCount) - - #make sure we actually have the right number of diffs - while randInt in randIndexes: - randInt = rand(0 ..< msgCount) - - randIndexes.incl(randInt) - - # sync window is 1 hour, spread msg equally in that time - let timeSlice = calculateTimeRange() - let timeWindow = int64(timeSlice.b) - int64(timeSlice.a) - let (part, _) = divmod(timeWindow, 100_000) - - var timestamp = timeSlice.a + ## ── generate messages and pre-load the two reconcilers ─────────────── + let slice = calculateTimeRange() # 1-hour window + let step = (int64(slice.b) - int64(slice.a)) div msgCount + var ts = slice.a for i in 0 ..< msgCount: let - msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic) - hash = computeMessageHash(DefaultPubsubTopic, msg) + msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + h = computeMessageHash(DefaultPubsubTopic, msg) - server.messageIngress(hash, msg) + server.messageIngress(h, msg) # every msg is on the server + if i notin missingIdx: + client.messageIngress(h, msg) # all but 100 are on the client + ts += Timestamp(step) - if i in randIndexes: - diffMsgHashes.incl(hash) + ## ── sanity before we start the round ───────────────────────────────── + check remoteNeeds.len == 0 + + ## ── launch reconciliation from the client towards the server ───────── + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + ## ── verify that ≈100 diffs were queued (allow 10 % slack) ──────────── + check remoteNeeds.len >= 90 # ≈ 100 × 0.9 + + asyncTest "sync 2 nodes 400K msgs 100k diffs": + const + msgCount = 400_000 + diffCount = 100_000 + tol = 1000 + + var diffMsgHashes: HashSet[WakuMessageHash] + var missingIdx: HashSet[int] + while missingIdx.len < diffCount: + missingIdx.incl rand(0 ..< msgCount) + + let slice = calculateTimeRange() + let step = (int64(slice.b) - int64(slice.a)) div msgCount + var ts = slice.a + + for i in 0 ..< msgCount: + let + msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + h = computeMessageHash(DefaultPubsubTopic, msg) + + server.messageIngress(h, msg) + if i notin missingIdx: + client.messageIngress(h, msg) else: - client.messageIngress(hash, msg) + diffMsgHashes.incl h - timestamp += Timestamp(part) - continue + ts += Timestamp(step) - check: - remoteNeeds.len == 0 + check remoteNeeds.len == 0 let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), $res.error - # timimg issue make it hard to match exact numbers - check: - remoteNeeds.len > 900 + check remoteNeeds.len >= diffCount - tol and remoteNeeds.len < diffCount + let (_, deliveredHash) = await remoteNeeds.get() + check deliveredHash in diffMsgHashes + + asyncTest "sync 2 nodes 100 msgs 20 diff – 1-second window": + const + msgCount = 100 + diffCount = 20 + + var missingIdx: seq[int] = @[] + while missingIdx.len < diffCount: + let n = rand(0 ..< msgCount) + if n notin missingIdx: + missingIdx.add n + + var diffMsgHashes: HashSet[WakuMessageHash] + + let sliceEnd = now() + let sliceStart = Timestamp uint64(sliceEnd) - 1_000_000_000'u64 + let step = (int64(sliceEnd) - int64(sliceStart)) div msgCount + var ts = sliceStart + + for i in 0 ..< msgCount: + let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + server.messageIngress(hash, msg) + + if i in missingIdx: + diffMsgHashes.incl hash + else: + client.messageIngress(hash, msg) + + ts += Timestamp(step) + + check remoteNeeds.len == 0 + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == diffCount + + for _ in 0 ..< diffCount: + let (_, deliveredHash) = await remoteNeeds.get() + check deliveredHash in diffMsgHashes + + asyncTest "sync 2 nodes 500k msgs 300k diff – stress window": + const + msgCount = 500_000 + diffCount = 300_000 + + randomize() + var allIdx = newSeq[int](msgCount) + for i in 0 ..< msgCount: + allIdx[i] = i + shuffle(allIdx) + + let missingIdx = allIdx[0 ..< diffCount] + var missingSet: HashSet[int] + for idx in missingIdx: + missingSet.incl idx + + var diffMsgHashes: HashSet[WakuMessageHash] + + let sliceEnd = now() + let sliceStart = Timestamp uint64(sliceEnd) - 1_000_000_000'u64 + let step = (int64(sliceEnd) - int64(sliceStart)) div msgCount + var ts = sliceStart + + for i in 0 ..< msgCount: + let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + server.messageIngress(hash, msg) + + if i in missingSet: + diffMsgHashes.incl hash + else: + client.messageIngress(hash, msg) + + ts += Timestamp(step) + + check remoteNeeds.len == 0 + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == diffCount + + for _ in 0 ..< 1000: + let (_, deliveredHash) = await remoteNeeds.get() + check deliveredHash in diffMsgHashes + + asyncTest "sync 2 nodes, 40 msgs: 20 in-window diff, 20 out-window ignored": + const + diffInWin = 20 + diffOutWin = 20 + stepOutNs = 100_000_000'u64 + outOffsetNs = 2_000_000_000'u64 # for 20 mesg they sent 2 seconds earlier + + randomize() + + let nowNs = getNowInNanosecondTime() + let sliceStart = Timestamp(uint64(nowNs) - 700_000_000'u64) + let sliceEnd = nowNs + let stepIn = (sliceEnd.int64 - sliceStart.int64) div diffInWin + + let oldStart = Timestamp(uint64(sliceStart) - outOffsetNs) + let stepOut = Timestamp(stepOutNs) + + var inWinHashes, outWinHashes: HashSet[WakuMessageHash] + + var ts = sliceStart + for _ in 0 ..< diffInWin: + let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + server.messageIngress(hash, msg) + inWinHashes.incl hash + ts += Timestamp(stepIn) + + ts = oldStart + for _ in 0 ..< diffOutWin: + let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + server.messageIngress(hash, msg) + outWinHashes.incl hash + ts += Timestamp(stepOut) + + check remoteNeeds.len == 0 + + let oneSec = timer.seconds(1) + + server = await newTestWakuRecon( + serverSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec + ) + + client = await newTestWakuRecon( + clientSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec + ) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == diffInWin + + for _ in 0 ..< diffInWin: + let (_, deliveredHashes) = await remoteNeeds.get() + check deliveredHashes in inWinHashes + check deliveredHashes notin outWinHashes + + asyncTest "hash-fingerprint collision, same timestamp – stable sort": + let ts = Timestamp(getNowInNanosecondTime()) + + var msg1 = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + var msg2 = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + msg2.payload[0] = msg2.payload[0] xor 0x01 + echo msg2 + var h1 = computeMessageHash(DefaultPubsubTopic, msg1) + var h2 = computeMessageHash(DefaultPubsubTopic, msg2) + + for i in 0 ..< 8: + h2[i] = h1[i] + for i in 0 ..< 8: + check h1[i] == h2[i] + + check h1 != h2 + + server.messageIngress(h1, msg1) + client.messageIngress(h2, msg2) + + check remoteNeeds.len == 0 + server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) + + client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == 1 + + var vec = @[SyncID(time: ts, hash: h2), SyncID(time: ts, hash: h1)] + vec.shuffle() + vec.sort() + + let hFirst = vec[0].hash + let hSecond = vec[1].hash + check vec[0].time == ts and vec[1].time == ts + + asyncTest "malformed message-ID is ignored during reconciliation": + let nowTs = Timestamp(getNowInNanosecondTime()) + + let goodMsg = fakeWakuMessage(ts = nowTs, contentTopic = DefaultContentTopic) + var goodHash = computeMessageHash(DefaultPubsubTopic, goodMsg) + + var badHash: WakuMessageHash + for i in 0 ..< 32: + badHash[i] = 0'u8 + let badMsg = fakeWakuMessage(ts = Timestamp(0), contentTopic = DefaultContentTopic) + + server.messageIngress(goodHash, goodMsg) + server.messageIngress(badHash, badMsg) + + check remoteNeeds.len == 0 + + server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) + client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == 1 + let (_, neededHash) = await remoteNeeds.get() + check neededHash == goodHash + check neededHash != badHash + + asyncTest "malformed ID: future-timestamp msg is ignored": + let nowNs = getNowInNanosecondTime() + let tsNow = Timestamp(nowNs) + + let goodMsg = fakeWakuMessage(ts = tsNow, contentTopic = DefaultContentTopic) + let goodHash = computeMessageHash(DefaultPubsubTopic, goodMsg) + + const tenYearsSec = 10 * 365 * 24 * 60 * 60 + let futureNs = nowNs + int64(tenYearsSec) * 1_000_000_000'i64 + let badTs = Timestamp(futureNs.uint64) + + let badMsg = fakeWakuMessage(ts = badTs, contentTopic = DefaultContentTopic) + let badHash = computeMessageHash(DefaultPubsubTopic, badMsg) + + server.messageIngress(goodHash, goodMsg) + server.messageIngress(badHash, badMsg) + + check remoteNeeds.len == 0 + server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) + client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == 1 + let (_, neededHash) = await remoteNeeds.get() + check neededHash == goodHash + check neededHash != badHash + + asyncTest "duplicate ID is queued only once": + let ts = Timestamp(getNowInNanosecondTime()) + let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + let h = computeMessageHash(DefaultPubsubTopic, msg) + + server.messageIngress(h, msg) + server.messageIngress(h, msg) + check remoteNeeds.len == 0 + + server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) + client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check remoteNeeds.len == 1 + let (_, neededHash) = await remoteNeeds.get() + check neededHash == h + + asyncTest "sync terminates immediately when no diffs exist": + let ts = Timestamp(getNowInNanosecondTime()) + let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + + server.messageIngress(hash, msg) + client.messageIngress(hash, msg) + + let idsQ = newAsyncQueue[SyncID]() + let wantsQ = newAsyncQueue[PeerId]() + let needsQ = newAsyncQueue[(PeerId, Fingerprint)]() + + server = await newTestWakuRecon(serverSwitch, idsQ, wantsQ, needsQ) + client = await newTestWakuRecon(clientSwitch, idsQ, wantsQ, needsQ) + + defer: + server.stop() + client.stop() + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check needsQ.len == 0 suite "Waku Sync: transfer": var @@ -396,3 +733,40 @@ suite "Waku Sync: transfer": check: response.messages.len > 0 + + asyncTest "Check the exact missing messages are received": + let timeSlice = calculateTimeRange() + let timeWindow = int64(timeSlice.b) - int64(timeSlice.a) + let (part, _) = divmod(timeWindow, 3) + + var ts = timeSlice.a + + let msgA = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + ts += Timestamp(part) + let msgB = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + ts += Timestamp(part) + let msgC = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic) + + let hA = computeMessageHash(DefaultPubsubTopic, msgA) + let hB = computeMessageHash(DefaultPubsubTopic, msgB) + let hC = computeMessageHash(DefaultPubsubTopic, msgC) + + discard serverDriver.put(DefaultPubsubTopic, @[msgA, msgB, msgC]) + discard clientDriver.put(DefaultPubsubTopic, @[msgA]) + + await serverRemoteNeeds.put((clientPeerInfo.peerId, hB)) + await serverRemoteNeeds.put((clientPeerInfo.peerId, hC)) + await clientLocalWants.put(serverPeerInfo.peerId) + + await sleepAsync(1.seconds) + check serverRemoteNeeds.len == 0 + + let sid1 = await clientIds.get() + let sid2 = await clientIds.get() + + let received = [sid1.hash, sid2.hash].toHashSet() + let expected = [hB, hC].toHashSet + + check received == expected + + check clientIds.len == 0 diff --git a/tests/waku_store_sync/test_state_transition.nim b/tests/waku_store_sync/test_state_transition.nim new file mode 100644 index 000000000..08ec9a91e --- /dev/null +++ b/tests/waku_store_sync/test_state_transition.nim @@ -0,0 +1,249 @@ +import unittest, nimcrypto, std/sequtils +import ../../waku/waku_store_sync/[reconciliation, common] +import ../../waku/waku_store_sync/storage/seq_storage +import ../../waku/waku_core/message/digest + +proc toDigest*(s: string): WakuMessageHash = + let d = nimcrypto.keccak256.digest((s & "").toOpenArrayByte(0, s.high)) + for i in 0 .. 31: + result[i] = d.data[i] + +proc `..`(a, b: SyncID): Slice[SyncID] = + Slice[SyncID](a: a, b: b) + +suite "Waku Sync – reconciliation": + test "Fingerprint → ItemSet → zero (default thresholds)": + const N = 2_000 + const idx = 137 + + let local = SeqStorage.new(@[]) + let remote = SeqStorage.new(@[]) + + var baseH, altH: WakuMessageHash + for i in 0 ..< N: + let ts = 1000 + i + let h = toDigest("msg" & $i) + discard local.insert(SyncID(time: ts, hash: h)) + var hr = h + if i == idx: + baseH = h + altH = toDigest("msg" & $i & "x") + hr = altH + discard remote.insert(SyncID(time: ts, hash: hr)) + + var z: WakuMessageHash + let whole = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z) + + var s1, r1: seq[WakuMessageHash] + let p1 = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(whole, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(whole)], + itemSets: @[], + ) + let rep1 = local.processPayload(p1, s1, r1) + check rep1.ranges.len == 8 + check rep1.ranges.allIt(it[1] == RangeType.Fingerprint) + + let mismT = 1000 + idx + let sub = + rep1.ranges.filterIt(mismT >= it[0].a.time and mismT <= it[0].b.time)[0][0] + + var s2, r2: seq[WakuMessageHash] + let p2 = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(sub, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(sub)], + itemSets: @[], + ) + let rep2 = local.processPayload(p2, s2, r2) + check rep2.ranges.len == 8 + check rep2.ranges.allIt(it[1] == RangeType.ItemSet) + + var s3, r3: seq[WakuMessageHash] + discard remote.processPayload(rep2, s3, r3) + check s3.len == 1 and s3[0] == altH + check r3.len == 1 and r3[0] == baseH + + discard local.insert(SyncID(time: mismT, hash: altH)) + discard remote.insert(SyncID(time: mismT, hash: baseH)) + + var s4, r4: seq[WakuMessageHash] + let p3 = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(sub, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(sub)], + itemSets: @[], + ) + let rep3 = local.processPayload(p3, s4, r4) + check rep3.ranges.len == 0 + check s4.len == 0 and r4.len == 0 + + test "test 2 ranges includes 1 skip": + const N = 120 + const pivot = 60 + + let local = SeqStorage.new(@[]) + let remote = SeqStorage.new(@[]) + + var diffHash: WakuMessageHash + for i in 0 ..< N: + let ts = 1000 + i + let h = toDigest("msg" & $i) + discard local.insert(SyncID(time: ts, hash: h)) + var hr: WakuMessageHash + if i >= pivot: + diffHash = toDigest("msg" & $i & "_x") + hr = diffHash + else: + hr = h + + discard remote.insert(SyncID(time: ts, hash: hr)) + + var z: WakuMessageHash + let sliceA = SyncID(time: 1000, hash: z) .. SyncID(time: 1059, hash: z) + let sliceB = SyncID(time: 1060, hash: z) .. SyncID(time: 1119, hash: z) + + var s, r: seq[WakuMessageHash] + let payload = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(sliceA, RangeType.Fingerprint), (sliceB, RangeType.Fingerprint)], + fingerprints: + @[remote.computeFingerprint(sliceA), remote.computeFingerprint(sliceB)], + itemSets: @[], + ) + let reply = local.processPayload(payload, s, r) + + check reply.ranges.len == 2 + check reply.ranges[0][1] == RangeType.Skip + check reply.ranges[1][1] == RangeType.ItemSet + check reply.itemSets.len == 1 + check not reply.itemSets[0].elements.anyIt(it.hash == diffHash) + + test "custom threshold (50) → eight ItemSets first round": + const N = 300 + const idx = 123 + + let local = SeqStorage.new(capacity = N, threshold = 50, partitions = 8) + let remote = SeqStorage.new(capacity = N, threshold = 50, partitions = 8) + + var baseH, altH: WakuMessageHash + for i in 0 ..< N: + let ts = 1000 + i + let h = toDigest("msg" & $i) + discard local.insert(SyncID(time: ts, hash: h)) + var hr = h + if i == idx: + baseH = h + altH = toDigest("msg" & $i & "_x") + hr = altH + discard remote.insert(SyncID(time: ts, hash: hr)) + + var z: WakuMessageHash + let slice = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z) + + var toS, toR: seq[WakuMessageHash] + let p = RangesData( + cluster: 0, + shards: @[0], + ranges: @[(slice, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(slice)], + itemSets: @[], + ) + let reply = local.processPayload(p, toS, toR) + + check reply.ranges.len == 8 + check reply.ranges.allIt(it[1] == RangeType.ItemSet) + check reply.itemSets.len == 8 + + let mismT = 1000 + idx + var hit = 0 + for ist in reply.itemSets: + if ist.elements.anyIt(it.time == mismT and it.hash == baseH): + inc hit + check hit == 1 + + test "test N=80K,3FP,2IS,SKIP": + const N = 80_000 + const bad = N - 10 + + let local = SeqStorage.new(@[]) + let remote = SeqStorage.new(@[]) + + var baseH, altH: WakuMessageHash + for i in 0 ..< N: + let ts = 1000 + i + let h = toDigest("msg" & $i) + discard local.insert(SyncID(time: ts, hash: h)) + + let hr = + if i == bad: + baseH = h + altH = toDigest("msg" & $i & "_x") + altH + else: + h + discard remote.insert(SyncID(time: ts, hash: hr)) + + var slice = + SyncID(time: 1000, hash: EmptyFingerprint) .. + SyncID(time: 1000 + N - 1, hash: FullFingerprint) + + proc fpReply(s: Slice[SyncID], sendQ, recvQ: var seq[WakuMessageHash]): RangesData = + local.processPayload( + RangesData( + cluster: 0, + shards: @[0], + ranges: @[(s, RangeType.Fingerprint)], + fingerprints: @[remote.computeFingerprint(s)], + itemSets: @[], + ), + sendQ, + recvQ, + ) + + var tmpS, tmpR: seq[WakuMessageHash] + + for r in 1 .. 3: + let rep = fpReply(slice, tmpS, tmpR) + echo "R{r} len={rep.ranges.len} expecting 8 FP" + check rep.ranges.len == 8 + check rep.ranges.allIt(it[1] == RangeType.Fingerprint) + for (sl, _) in rep.ranges: + if local.computeFingerprint(sl) != remote.computeFingerprint(sl): + slice = sl + break + + let rep4 = fpReply(slice, tmpS, tmpR) + echo "R4 len={rep4.ranges.len} expecting 8 IS" + check rep4.ranges.len == 8 + check rep4.ranges.allIt(it[1] == RangeType.ItemSet) + for (sl, _) in rep4.ranges: + if sl.a.time <= 1000 + bad and sl.b.time >= 1000 + bad: + slice = sl + break + + var send5, recv5: seq[WakuMessageHash] + let rep5 = fpReply(slice, send5, recv5) + echo "R5 len={rep5.ranges.len} expecting 1 IS" + check rep5.ranges.len == 1 + check rep5.ranges[0][1] == RangeType.ItemSet + + var qSend, qRecv: seq[WakuMessageHash] + discard remote.processPayload(rep5, qSend, qRecv) + echo "queue send={qSend.len} recv={qRecv.len}" + check qSend.len == 1 and qSend[0] == altH + check qRecv.len == 1 and qRecv[0] == baseH + + discard local.insert(SyncID(time: slice.a.time, hash: altH)) + discard remote.insert(SyncID(time: slice.a.time, hash: baseH)) + + var send6, recv6: seq[WakuMessageHash] + let rep6 = fpReply(slice, send6, recv6) + echo "R6 len={rep6.ranges.len} expecting 0" + check rep6.ranges.len == 0 + check send6.len == 0 and recv6.len == 0