mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
test: Waku sync tests part2 (#3397)
* Revert "Revert "Add finger print tests"" This reverts commit 36066311f91da31ca69fef3fa327d5e7fda7e50c. * Add state transition test * Add last test for state transition * Add new tests to transfer protocol * Add stree test scenarios * Add stress tests and edge scenarios * Add test outside sync window * Add edge tests * Add last corner test * Apply linters on files
This commit is contained in:
parent
d01dd9959c
commit
d41179e562
@ -26,6 +26,7 @@ proc newTestWakuRecon*(
|
||||
wantsTx: AsyncQueue[PeerId],
|
||||
needsTx: AsyncQueue[(PeerId, Fingerprint)],
|
||||
cluster: uint16 = 1,
|
||||
syncRange: timer.Duration = DefaultSyncRange,
|
||||
shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7],
|
||||
): Future[SyncReconciliation] {.async.} =
|
||||
let peerManager = PeerManager.new(switch)
|
||||
@ -36,6 +37,7 @@ proc newTestWakuRecon*(
|
||||
peerManager = peerManager,
|
||||
wakuArchive = nil,
|
||||
relayJitter = 0.seconds,
|
||||
syncRange = syncRange,
|
||||
idsRx = idsRx,
|
||||
localWantsTx = wantsTx,
|
||||
remoteNeedsTx = needsTx,
|
||||
|
||||
@ -1,8 +1,12 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, sets, random, math], testutils/unittests, chronos, libp2p/crypto/crypto
|
||||
|
||||
std/[options, sets, random, math, algorithm],
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
libp2p/crypto/crypto
|
||||
import chronos, chronos/asyncsync
|
||||
import nimcrypto
|
||||
import
|
||||
../../waku/[
|
||||
node/peer_manager,
|
||||
@ -21,6 +25,15 @@ import
|
||||
../waku_archive/archive_utils,
|
||||
./sync_utils
|
||||
|
||||
proc collectDiffs*(
|
||||
chan: var Channel[SyncID], diffCount: int
|
||||
): HashSet[WakuMessageHash] =
|
||||
var received: HashSet[WakuMessageHash]
|
||||
while received.len < diffCount:
|
||||
let sid = chan.recv() # synchronous receive
|
||||
received.incl sid.hash
|
||||
result = received
|
||||
|
||||
suite "Waku Sync: reconciliation":
|
||||
var serverSwitch {.threadvar.}: Switch
|
||||
var clientSwitch {.threadvar.}: Switch
|
||||
@ -234,53 +247,377 @@ suite "Waku Sync: reconciliation":
|
||||
remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == true
|
||||
|
||||
asyncTest "sync 2 nodes 10K msgs 1K diffs":
|
||||
let msgCount = 10_000
|
||||
var diffCount = 1_000
|
||||
const
|
||||
msgCount = 200_000 # total messages on the server
|
||||
diffCount = 100 # messages initially missing on the client
|
||||
|
||||
var diffMsgHashes: HashSet[WakuMessageHash]
|
||||
var randIndexes: HashSet[int]
|
||||
## ── choose which messages will be absent from the client ─────────────
|
||||
var missingIdx: HashSet[int]
|
||||
while missingIdx.len < diffCount:
|
||||
missingIdx.incl rand(0 ..< msgCount)
|
||||
|
||||
# Diffs
|
||||
for i in 0 ..< diffCount:
|
||||
var randInt = rand(0 ..< msgCount)
|
||||
|
||||
#make sure we actually have the right number of diffs
|
||||
while randInt in randIndexes:
|
||||
randInt = rand(0 ..< msgCount)
|
||||
|
||||
randIndexes.incl(randInt)
|
||||
|
||||
# sync window is 1 hour, spread msg equally in that time
|
||||
let timeSlice = calculateTimeRange()
|
||||
let timeWindow = int64(timeSlice.b) - int64(timeSlice.a)
|
||||
let (part, _) = divmod(timeWindow, 100_000)
|
||||
|
||||
var timestamp = timeSlice.a
|
||||
## ── generate messages and pre-load the two reconcilers ───────────────
|
||||
let slice = calculateTimeRange() # 1-hour window
|
||||
let step = (int64(slice.b) - int64(slice.a)) div msgCount
|
||||
var ts = slice.a
|
||||
|
||||
for i in 0 ..< msgCount:
|
||||
let
|
||||
msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic)
|
||||
hash = computeMessageHash(DefaultPubsubTopic, msg)
|
||||
msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
|
||||
h = computeMessageHash(DefaultPubsubTopic, msg)
|
||||
|
||||
server.messageIngress(hash, msg)
|
||||
server.messageIngress(h, msg) # every msg is on the server
|
||||
if i notin missingIdx:
|
||||
client.messageIngress(h, msg) # all but 100 are on the client
|
||||
ts += Timestamp(step)
|
||||
|
||||
if i in randIndexes:
|
||||
diffMsgHashes.incl(hash)
|
||||
## ── sanity before we start the round ─────────────────────────────────
|
||||
check remoteNeeds.len == 0
|
||||
|
||||
## ── launch reconciliation from the client towards the server ─────────
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
## ── verify that ≈100 diffs were queued (allow 10 % slack) ────────────
|
||||
check remoteNeeds.len >= 90 # ≈ 100 × 0.9
|
||||
|
||||
asyncTest "sync 2 nodes 400K msgs 100k diffs":
|
||||
const
|
||||
msgCount = 400_000
|
||||
diffCount = 100_000
|
||||
tol = 1000
|
||||
|
||||
var diffMsgHashes: HashSet[WakuMessageHash]
|
||||
var missingIdx: HashSet[int]
|
||||
while missingIdx.len < diffCount:
|
||||
missingIdx.incl rand(0 ..< msgCount)
|
||||
|
||||
let slice = calculateTimeRange()
|
||||
let step = (int64(slice.b) - int64(slice.a)) div msgCount
|
||||
var ts = slice.a
|
||||
|
||||
for i in 0 ..< msgCount:
|
||||
let
|
||||
msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
|
||||
h = computeMessageHash(DefaultPubsubTopic, msg)
|
||||
|
||||
server.messageIngress(h, msg)
|
||||
if i notin missingIdx:
|
||||
client.messageIngress(h, msg)
|
||||
else:
|
||||
client.messageIngress(hash, msg)
|
||||
diffMsgHashes.incl h
|
||||
|
||||
timestamp += Timestamp(part)
|
||||
continue
|
||||
ts += Timestamp(step)
|
||||
|
||||
check:
|
||||
remoteNeeds.len == 0
|
||||
check remoteNeeds.len == 0
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
# timimg issue make it hard to match exact numbers
|
||||
check:
|
||||
remoteNeeds.len > 900
|
||||
check remoteNeeds.len >= diffCount - tol and remoteNeeds.len < diffCount
|
||||
let (_, deliveredHash) = await remoteNeeds.get()
|
||||
check deliveredHash in diffMsgHashes
|
||||
|
||||
asyncTest "sync 2 nodes 100 msgs 20 diff – 1-second window":
|
||||
const
|
||||
msgCount = 100
|
||||
diffCount = 20
|
||||
|
||||
var missingIdx: seq[int] = @[]
|
||||
while missingIdx.len < diffCount:
|
||||
let n = rand(0 ..< msgCount)
|
||||
if n notin missingIdx:
|
||||
missingIdx.add n
|
||||
|
||||
var diffMsgHashes: HashSet[WakuMessageHash]
|
||||
|
||||
let sliceEnd = now()
|
||||
let sliceStart = Timestamp uint64(sliceEnd) - 1_000_000_000'u64
|
||||
let step = (int64(sliceEnd) - int64(sliceStart)) div msgCount
|
||||
var ts = sliceStart
|
||||
|
||||
for i in 0 ..< msgCount:
|
||||
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
|
||||
let hash = computeMessageHash(DefaultPubsubTopic, msg)
|
||||
server.messageIngress(hash, msg)
|
||||
|
||||
if i in missingIdx:
|
||||
diffMsgHashes.incl hash
|
||||
else:
|
||||
client.messageIngress(hash, msg)
|
||||
|
||||
ts += Timestamp(step)
|
||||
|
||||
check remoteNeeds.len == 0
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
check remoteNeeds.len == diffCount
|
||||
|
||||
for _ in 0 ..< diffCount:
|
||||
let (_, deliveredHash) = await remoteNeeds.get()
|
||||
check deliveredHash in diffMsgHashes
|
||||
|
||||
asyncTest "sync 2 nodes 500k msgs 300k diff – stress window":
|
||||
const
|
||||
msgCount = 500_000
|
||||
diffCount = 300_000
|
||||
|
||||
randomize()
|
||||
var allIdx = newSeq[int](msgCount)
|
||||
for i in 0 ..< msgCount:
|
||||
allIdx[i] = i
|
||||
shuffle(allIdx)
|
||||
|
||||
let missingIdx = allIdx[0 ..< diffCount]
|
||||
var missingSet: HashSet[int]
|
||||
for idx in missingIdx:
|
||||
missingSet.incl idx
|
||||
|
||||
var diffMsgHashes: HashSet[WakuMessageHash]
|
||||
|
||||
let sliceEnd = now()
|
||||
let sliceStart = Timestamp uint64(sliceEnd) - 1_000_000_000'u64
|
||||
let step = (int64(sliceEnd) - int64(sliceStart)) div msgCount
|
||||
var ts = sliceStart
|
||||
|
||||
for i in 0 ..< msgCount:
|
||||
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
|
||||
let hash = computeMessageHash(DefaultPubsubTopic, msg)
|
||||
server.messageIngress(hash, msg)
|
||||
|
||||
if i in missingSet:
|
||||
diffMsgHashes.incl hash
|
||||
else:
|
||||
client.messageIngress(hash, msg)
|
||||
|
||||
ts += Timestamp(step)
|
||||
|
||||
check remoteNeeds.len == 0
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
check remoteNeeds.len == diffCount
|
||||
|
||||
for _ in 0 ..< 1000:
|
||||
let (_, deliveredHash) = await remoteNeeds.get()
|
||||
check deliveredHash in diffMsgHashes
|
||||
|
||||
asyncTest "sync 2 nodes, 40 msgs: 20 in-window diff, 20 out-window ignored":
|
||||
const
|
||||
diffInWin = 20
|
||||
diffOutWin = 20
|
||||
stepOutNs = 100_000_000'u64
|
||||
outOffsetNs = 2_000_000_000'u64 # for 20 mesg they sent 2 seconds earlier
|
||||
|
||||
randomize()
|
||||
|
||||
let nowNs = getNowInNanosecondTime()
|
||||
let sliceStart = Timestamp(uint64(nowNs) - 700_000_000'u64)
|
||||
let sliceEnd = nowNs
|
||||
let stepIn = (sliceEnd.int64 - sliceStart.int64) div diffInWin
|
||||
|
||||
let oldStart = Timestamp(uint64(sliceStart) - outOffsetNs)
|
||||
let stepOut = Timestamp(stepOutNs)
|
||||
|
||||
var inWinHashes, outWinHashes: HashSet[WakuMessageHash]
|
||||
|
||||
var ts = sliceStart
|
||||
for _ in 0 ..< diffInWin:
|
||||
let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic)
|
||||
let hash = computeMessageHash(DefaultPubsubTopic, msg)
|
||||
server.messageIngress(hash, msg)
|
||||
inWinHashes.incl hash
|
||||
ts += Timestamp(stepIn)
|
||||
|
||||
ts = oldStart
|
||||
for _ in 0 ..< diffOutWin:
|
||||
let msg = fakeWakuMessage(ts = Timestamp ts, contentTopic = DefaultContentTopic)
|
||||
let hash = computeMessageHash(DefaultPubsubTopic, msg)
|
||||
server.messageIngress(hash, msg)
|
||||
outWinHashes.incl hash
|
||||
ts += Timestamp(stepOut)
|
||||
|
||||
check remoteNeeds.len == 0
|
||||
|
||||
let oneSec = timer.seconds(1)
|
||||
|
||||
server = await newTestWakuRecon(
|
||||
serverSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec
|
||||
)
|
||||
|
||||
client = await newTestWakuRecon(
|
||||
clientSwitch, idsChannel, localWants, remoteNeeds, syncRange = oneSec
|
||||
)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
check remoteNeeds.len == diffInWin
|
||||
|
||||
for _ in 0 ..< diffInWin:
|
||||
let (_, deliveredHashes) = await remoteNeeds.get()
|
||||
check deliveredHashes in inWinHashes
|
||||
check deliveredHashes notin outWinHashes
|
||||
|
||||
asyncTest "hash-fingerprint collision, same timestamp – stable sort":
|
||||
let ts = Timestamp(getNowInNanosecondTime())
|
||||
|
||||
var msg1 = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
|
||||
var msg2 = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
|
||||
msg2.payload[0] = msg2.payload[0] xor 0x01
|
||||
echo msg2
|
||||
var h1 = computeMessageHash(DefaultPubsubTopic, msg1)
|
||||
var h2 = computeMessageHash(DefaultPubsubTopic, msg2)
|
||||
|
||||
for i in 0 ..< 8:
|
||||
h2[i] = h1[i]
|
||||
for i in 0 ..< 8:
|
||||
check h1[i] == h2[i]
|
||||
|
||||
check h1 != h2
|
||||
|
||||
server.messageIngress(h1, msg1)
|
||||
client.messageIngress(h2, msg2)
|
||||
|
||||
check remoteNeeds.len == 0
|
||||
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
|
||||
|
||||
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
check remoteNeeds.len == 1
|
||||
|
||||
var vec = @[SyncID(time: ts, hash: h2), SyncID(time: ts, hash: h1)]
|
||||
vec.shuffle()
|
||||
vec.sort()
|
||||
|
||||
let hFirst = vec[0].hash
|
||||
let hSecond = vec[1].hash
|
||||
check vec[0].time == ts and vec[1].time == ts
|
||||
|
||||
asyncTest "malformed message-ID is ignored during reconciliation":
|
||||
let nowTs = Timestamp(getNowInNanosecondTime())
|
||||
|
||||
let goodMsg = fakeWakuMessage(ts = nowTs, contentTopic = DefaultContentTopic)
|
||||
var goodHash = computeMessageHash(DefaultPubsubTopic, goodMsg)
|
||||
|
||||
var badHash: WakuMessageHash
|
||||
for i in 0 ..< 32:
|
||||
badHash[i] = 0'u8
|
||||
let badMsg = fakeWakuMessage(ts = Timestamp(0), contentTopic = DefaultContentTopic)
|
||||
|
||||
server.messageIngress(goodHash, goodMsg)
|
||||
server.messageIngress(badHash, badMsg)
|
||||
|
||||
check remoteNeeds.len == 0
|
||||
|
||||
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
|
||||
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
check remoteNeeds.len == 1
|
||||
let (_, neededHash) = await remoteNeeds.get()
|
||||
check neededHash == goodHash
|
||||
check neededHash != badHash
|
||||
|
||||
asyncTest "malformed ID: future-timestamp msg is ignored":
|
||||
let nowNs = getNowInNanosecondTime()
|
||||
let tsNow = Timestamp(nowNs)
|
||||
|
||||
let goodMsg = fakeWakuMessage(ts = tsNow, contentTopic = DefaultContentTopic)
|
||||
let goodHash = computeMessageHash(DefaultPubsubTopic, goodMsg)
|
||||
|
||||
const tenYearsSec = 10 * 365 * 24 * 60 * 60
|
||||
let futureNs = nowNs + int64(tenYearsSec) * 1_000_000_000'i64
|
||||
let badTs = Timestamp(futureNs.uint64)
|
||||
|
||||
let badMsg = fakeWakuMessage(ts = badTs, contentTopic = DefaultContentTopic)
|
||||
let badHash = computeMessageHash(DefaultPubsubTopic, badMsg)
|
||||
|
||||
server.messageIngress(goodHash, goodMsg)
|
||||
server.messageIngress(badHash, badMsg)
|
||||
|
||||
check remoteNeeds.len == 0
|
||||
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
|
||||
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
check remoteNeeds.len == 1
|
||||
let (_, neededHash) = await remoteNeeds.get()
|
||||
check neededHash == goodHash
|
||||
check neededHash != badHash
|
||||
|
||||
asyncTest "duplicate ID is queued only once":
|
||||
let ts = Timestamp(getNowInNanosecondTime())
|
||||
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
|
||||
let h = computeMessageHash(DefaultPubsubTopic, msg)
|
||||
|
||||
server.messageIngress(h, msg)
|
||||
server.messageIngress(h, msg)
|
||||
check remoteNeeds.len == 0
|
||||
|
||||
server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds)
|
||||
client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
check remoteNeeds.len == 1
|
||||
let (_, neededHash) = await remoteNeeds.get()
|
||||
check neededHash == h
|
||||
|
||||
asyncTest "sync terminates immediately when no diffs exist":
|
||||
let ts = Timestamp(getNowInNanosecondTime())
|
||||
let msg = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
|
||||
let hash = computeMessageHash(DefaultPubsubTopic, msg)
|
||||
|
||||
server.messageIngress(hash, msg)
|
||||
client.messageIngress(hash, msg)
|
||||
|
||||
let idsQ = newAsyncQueue[SyncID]()
|
||||
let wantsQ = newAsyncQueue[PeerId]()
|
||||
let needsQ = newAsyncQueue[(PeerId, Fingerprint)]()
|
||||
|
||||
server = await newTestWakuRecon(serverSwitch, idsQ, wantsQ, needsQ)
|
||||
client = await newTestWakuRecon(clientSwitch, idsQ, wantsQ, needsQ)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
check needsQ.len == 0
|
||||
|
||||
suite "Waku Sync: transfer":
|
||||
var
|
||||
@ -396,3 +733,40 @@ suite "Waku Sync: transfer":
|
||||
|
||||
check:
|
||||
response.messages.len > 0
|
||||
|
||||
asyncTest "Check the exact missing messages are received":
|
||||
let timeSlice = calculateTimeRange()
|
||||
let timeWindow = int64(timeSlice.b) - int64(timeSlice.a)
|
||||
let (part, _) = divmod(timeWindow, 3)
|
||||
|
||||
var ts = timeSlice.a
|
||||
|
||||
let msgA = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
|
||||
ts += Timestamp(part)
|
||||
let msgB = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
|
||||
ts += Timestamp(part)
|
||||
let msgC = fakeWakuMessage(ts = ts, contentTopic = DefaultContentTopic)
|
||||
|
||||
let hA = computeMessageHash(DefaultPubsubTopic, msgA)
|
||||
let hB = computeMessageHash(DefaultPubsubTopic, msgB)
|
||||
let hC = computeMessageHash(DefaultPubsubTopic, msgC)
|
||||
|
||||
discard serverDriver.put(DefaultPubsubTopic, @[msgA, msgB, msgC])
|
||||
discard clientDriver.put(DefaultPubsubTopic, @[msgA])
|
||||
|
||||
await serverRemoteNeeds.put((clientPeerInfo.peerId, hB))
|
||||
await serverRemoteNeeds.put((clientPeerInfo.peerId, hC))
|
||||
await clientLocalWants.put(serverPeerInfo.peerId)
|
||||
|
||||
await sleepAsync(1.seconds)
|
||||
check serverRemoteNeeds.len == 0
|
||||
|
||||
let sid1 = await clientIds.get()
|
||||
let sid2 = await clientIds.get()
|
||||
|
||||
let received = [sid1.hash, sid2.hash].toHashSet()
|
||||
let expected = [hB, hC].toHashSet
|
||||
|
||||
check received == expected
|
||||
|
||||
check clientIds.len == 0
|
||||
|
||||
249
tests/waku_store_sync/test_state_transition.nim
Normal file
249
tests/waku_store_sync/test_state_transition.nim
Normal file
@ -0,0 +1,249 @@
|
||||
import unittest, nimcrypto, std/sequtils
|
||||
import ../../waku/waku_store_sync/[reconciliation, common]
|
||||
import ../../waku/waku_store_sync/storage/seq_storage
|
||||
import ../../waku/waku_core/message/digest
|
||||
|
||||
proc toDigest*(s: string): WakuMessageHash =
|
||||
let d = nimcrypto.keccak256.digest((s & "").toOpenArrayByte(0, s.high))
|
||||
for i in 0 .. 31:
|
||||
result[i] = d.data[i]
|
||||
|
||||
proc `..`(a, b: SyncID): Slice[SyncID] =
|
||||
Slice[SyncID](a: a, b: b)
|
||||
|
||||
suite "Waku Sync – reconciliation":
|
||||
test "Fingerprint → ItemSet → zero (default thresholds)":
|
||||
const N = 2_000
|
||||
const idx = 137
|
||||
|
||||
let local = SeqStorage.new(@[])
|
||||
let remote = SeqStorage.new(@[])
|
||||
|
||||
var baseH, altH: WakuMessageHash
|
||||
for i in 0 ..< N:
|
||||
let ts = 1000 + i
|
||||
let h = toDigest("msg" & $i)
|
||||
discard local.insert(SyncID(time: ts, hash: h))
|
||||
var hr = h
|
||||
if i == idx:
|
||||
baseH = h
|
||||
altH = toDigest("msg" & $i & "x")
|
||||
hr = altH
|
||||
discard remote.insert(SyncID(time: ts, hash: hr))
|
||||
|
||||
var z: WakuMessageHash
|
||||
let whole = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z)
|
||||
|
||||
var s1, r1: seq[WakuMessageHash]
|
||||
let p1 = RangesData(
|
||||
cluster: 0,
|
||||
shards: @[0],
|
||||
ranges: @[(whole, RangeType.Fingerprint)],
|
||||
fingerprints: @[remote.computeFingerprint(whole)],
|
||||
itemSets: @[],
|
||||
)
|
||||
let rep1 = local.processPayload(p1, s1, r1)
|
||||
check rep1.ranges.len == 8
|
||||
check rep1.ranges.allIt(it[1] == RangeType.Fingerprint)
|
||||
|
||||
let mismT = 1000 + idx
|
||||
let sub =
|
||||
rep1.ranges.filterIt(mismT >= it[0].a.time and mismT <= it[0].b.time)[0][0]
|
||||
|
||||
var s2, r2: seq[WakuMessageHash]
|
||||
let p2 = RangesData(
|
||||
cluster: 0,
|
||||
shards: @[0],
|
||||
ranges: @[(sub, RangeType.Fingerprint)],
|
||||
fingerprints: @[remote.computeFingerprint(sub)],
|
||||
itemSets: @[],
|
||||
)
|
||||
let rep2 = local.processPayload(p2, s2, r2)
|
||||
check rep2.ranges.len == 8
|
||||
check rep2.ranges.allIt(it[1] == RangeType.ItemSet)
|
||||
|
||||
var s3, r3: seq[WakuMessageHash]
|
||||
discard remote.processPayload(rep2, s3, r3)
|
||||
check s3.len == 1 and s3[0] == altH
|
||||
check r3.len == 1 and r3[0] == baseH
|
||||
|
||||
discard local.insert(SyncID(time: mismT, hash: altH))
|
||||
discard remote.insert(SyncID(time: mismT, hash: baseH))
|
||||
|
||||
var s4, r4: seq[WakuMessageHash]
|
||||
let p3 = RangesData(
|
||||
cluster: 0,
|
||||
shards: @[0],
|
||||
ranges: @[(sub, RangeType.Fingerprint)],
|
||||
fingerprints: @[remote.computeFingerprint(sub)],
|
||||
itemSets: @[],
|
||||
)
|
||||
let rep3 = local.processPayload(p3, s4, r4)
|
||||
check rep3.ranges.len == 0
|
||||
check s4.len == 0 and r4.len == 0
|
||||
|
||||
test "test 2 ranges includes 1 skip":
|
||||
const N = 120
|
||||
const pivot = 60
|
||||
|
||||
let local = SeqStorage.new(@[])
|
||||
let remote = SeqStorage.new(@[])
|
||||
|
||||
var diffHash: WakuMessageHash
|
||||
for i in 0 ..< N:
|
||||
let ts = 1000 + i
|
||||
let h = toDigest("msg" & $i)
|
||||
discard local.insert(SyncID(time: ts, hash: h))
|
||||
var hr: WakuMessageHash
|
||||
if i >= pivot:
|
||||
diffHash = toDigest("msg" & $i & "_x")
|
||||
hr = diffHash
|
||||
else:
|
||||
hr = h
|
||||
|
||||
discard remote.insert(SyncID(time: ts, hash: hr))
|
||||
|
||||
var z: WakuMessageHash
|
||||
let sliceA = SyncID(time: 1000, hash: z) .. SyncID(time: 1059, hash: z)
|
||||
let sliceB = SyncID(time: 1060, hash: z) .. SyncID(time: 1119, hash: z)
|
||||
|
||||
var s, r: seq[WakuMessageHash]
|
||||
let payload = RangesData(
|
||||
cluster: 0,
|
||||
shards: @[0],
|
||||
ranges: @[(sliceA, RangeType.Fingerprint), (sliceB, RangeType.Fingerprint)],
|
||||
fingerprints:
|
||||
@[remote.computeFingerprint(sliceA), remote.computeFingerprint(sliceB)],
|
||||
itemSets: @[],
|
||||
)
|
||||
let reply = local.processPayload(payload, s, r)
|
||||
|
||||
check reply.ranges.len == 2
|
||||
check reply.ranges[0][1] == RangeType.Skip
|
||||
check reply.ranges[1][1] == RangeType.ItemSet
|
||||
check reply.itemSets.len == 1
|
||||
check not reply.itemSets[0].elements.anyIt(it.hash == diffHash)
|
||||
|
||||
test "custom threshold (50) → eight ItemSets first round":
|
||||
const N = 300
|
||||
const idx = 123
|
||||
|
||||
let local = SeqStorage.new(capacity = N, threshold = 50, partitions = 8)
|
||||
let remote = SeqStorage.new(capacity = N, threshold = 50, partitions = 8)
|
||||
|
||||
var baseH, altH: WakuMessageHash
|
||||
for i in 0 ..< N:
|
||||
let ts = 1000 + i
|
||||
let h = toDigest("msg" & $i)
|
||||
discard local.insert(SyncID(time: ts, hash: h))
|
||||
var hr = h
|
||||
if i == idx:
|
||||
baseH = h
|
||||
altH = toDigest("msg" & $i & "_x")
|
||||
hr = altH
|
||||
discard remote.insert(SyncID(time: ts, hash: hr))
|
||||
|
||||
var z: WakuMessageHash
|
||||
let slice = SyncID(time: 1000, hash: z) .. SyncID(time: 1000 + N - 1, hash: z)
|
||||
|
||||
var toS, toR: seq[WakuMessageHash]
|
||||
let p = RangesData(
|
||||
cluster: 0,
|
||||
shards: @[0],
|
||||
ranges: @[(slice, RangeType.Fingerprint)],
|
||||
fingerprints: @[remote.computeFingerprint(slice)],
|
||||
itemSets: @[],
|
||||
)
|
||||
let reply = local.processPayload(p, toS, toR)
|
||||
|
||||
check reply.ranges.len == 8
|
||||
check reply.ranges.allIt(it[1] == RangeType.ItemSet)
|
||||
check reply.itemSets.len == 8
|
||||
|
||||
let mismT = 1000 + idx
|
||||
var hit = 0
|
||||
for ist in reply.itemSets:
|
||||
if ist.elements.anyIt(it.time == mismT and it.hash == baseH):
|
||||
inc hit
|
||||
check hit == 1
|
||||
|
||||
test "test N=80K,3FP,2IS,SKIP":
|
||||
const N = 80_000
|
||||
const bad = N - 10
|
||||
|
||||
let local = SeqStorage.new(@[])
|
||||
let remote = SeqStorage.new(@[])
|
||||
|
||||
var baseH, altH: WakuMessageHash
|
||||
for i in 0 ..< N:
|
||||
let ts = 1000 + i
|
||||
let h = toDigest("msg" & $i)
|
||||
discard local.insert(SyncID(time: ts, hash: h))
|
||||
|
||||
let hr =
|
||||
if i == bad:
|
||||
baseH = h
|
||||
altH = toDigest("msg" & $i & "_x")
|
||||
altH
|
||||
else:
|
||||
h
|
||||
discard remote.insert(SyncID(time: ts, hash: hr))
|
||||
|
||||
var slice =
|
||||
SyncID(time: 1000, hash: EmptyFingerprint) ..
|
||||
SyncID(time: 1000 + N - 1, hash: FullFingerprint)
|
||||
|
||||
proc fpReply(s: Slice[SyncID], sendQ, recvQ: var seq[WakuMessageHash]): RangesData =
|
||||
local.processPayload(
|
||||
RangesData(
|
||||
cluster: 0,
|
||||
shards: @[0],
|
||||
ranges: @[(s, RangeType.Fingerprint)],
|
||||
fingerprints: @[remote.computeFingerprint(s)],
|
||||
itemSets: @[],
|
||||
),
|
||||
sendQ,
|
||||
recvQ,
|
||||
)
|
||||
|
||||
var tmpS, tmpR: seq[WakuMessageHash]
|
||||
|
||||
for r in 1 .. 3:
|
||||
let rep = fpReply(slice, tmpS, tmpR)
|
||||
echo "R{r} len={rep.ranges.len} expecting 8 FP"
|
||||
check rep.ranges.len == 8
|
||||
check rep.ranges.allIt(it[1] == RangeType.Fingerprint)
|
||||
for (sl, _) in rep.ranges:
|
||||
if local.computeFingerprint(sl) != remote.computeFingerprint(sl):
|
||||
slice = sl
|
||||
break
|
||||
|
||||
let rep4 = fpReply(slice, tmpS, tmpR)
|
||||
echo "R4 len={rep4.ranges.len} expecting 8 IS"
|
||||
check rep4.ranges.len == 8
|
||||
check rep4.ranges.allIt(it[1] == RangeType.ItemSet)
|
||||
for (sl, _) in rep4.ranges:
|
||||
if sl.a.time <= 1000 + bad and sl.b.time >= 1000 + bad:
|
||||
slice = sl
|
||||
break
|
||||
|
||||
var send5, recv5: seq[WakuMessageHash]
|
||||
let rep5 = fpReply(slice, send5, recv5)
|
||||
echo "R5 len={rep5.ranges.len} expecting 1 IS"
|
||||
check rep5.ranges.len == 1
|
||||
check rep5.ranges[0][1] == RangeType.ItemSet
|
||||
|
||||
var qSend, qRecv: seq[WakuMessageHash]
|
||||
discard remote.processPayload(rep5, qSend, qRecv)
|
||||
echo "queue send={qSend.len} recv={qRecv.len}"
|
||||
check qSend.len == 1 and qSend[0] == altH
|
||||
check qRecv.len == 1 and qRecv[0] == baseH
|
||||
|
||||
discard local.insert(SyncID(time: slice.a.time, hash: altH))
|
||||
discard remote.insert(SyncID(time: slice.a.time, hash: baseH))
|
||||
|
||||
var send6, recv6: seq[WakuMessageHash]
|
||||
let rep6 = fpReply(slice, send6, recv6)
|
||||
echo "R6 len={rep6.ranges.len} expecting 0"
|
||||
check rep6.ranges.len == 0
|
||||
check send6.len == 0 and recv6.len == 0
|
||||
Loading…
x
Reference in New Issue
Block a user