feat: waku store sync 2.0 storage & tests (#3215)

This commit is contained in:
Simon-Pierre Vivier 2025-01-23 10:39:23 -05:00 committed by GitHub
parent 2eca003be0
commit f550c76eb1
7 changed files with 643 additions and 32 deletions

View File

@ -1,6 +1,6 @@
import std/[options, random], chronos, chronicles
import waku/[node/peer_manager, waku_core, waku_store_sync], ../testlib/wakucore
import waku/[node/peer_manager, waku_core, waku_store_sync/common], ../testlib/wakucore
randomize()
@ -12,7 +12,7 @@ proc randomHash*(rng: var Rand): WakuMessageHash =
return hash
proc newTestWakuRecon*(
#[ proc newTestWakuRecon*(
switch: Switch,
idsRx: AsyncQueue[SyncID],
wantsTx: AsyncQueue[(PeerId, Fingerprint)],
@ -34,9 +34,9 @@ proc newTestWakuRecon*(
proto.start()
switch.mount(proto)
return proto
return proto ]#
proc newTestWakuTransfer*(
#[ proc newTestWakuTransfer*(
switch: Switch,
idsTx: AsyncQueue[SyncID],
wantsRx: AsyncQueue[(PeerId, Fingerprint)],
@ -55,4 +55,4 @@ proc newTestWakuTransfer*(
proto.start()
switch.mount(proto)
return proto
return proto ]#

View File

@ -6,7 +6,6 @@ import
../../waku/waku_core,
../../waku/waku_core/message/digest,
../../waku/waku_core/time,
../../waku/waku_store_sync,
../../waku/waku_store_sync/common,
../../waku/waku_store_sync/codec,
./sync_utils
@ -19,12 +18,12 @@ proc randomItemSet(count: int, startTime: Timestamp, rng: var Rand): ItemSet =
for i in 0 ..< count:
let diff = rng.rand(9.uint8) + 1
let timestamp = lastTime + diff * 1_000_000_000
let timestamp = lastTime + diff * 1_000
lastTime = timestamp
let hash = randomHash(rng)
let id = SyncID(time: Timestamp(timestamp), fingerprint: hash)
let id = SyncID(time: Timestamp(timestamp), hash: hash)
elements.add(id)
@ -40,8 +39,8 @@ proc randomSetRange(
ub = itemSet.elements[^1]
#for test check equality
lb.fingerprint = EmptyFingerprint
ub.fingerprint = EmptyFingerprint
lb.hash = EmptyWakuMessageHash
ub.hash = EmptyWakuMessageHash
let bounds = lb .. ub
@ -90,9 +89,9 @@ suite "Waku Store Sync Codec":
time = getNowInNanosecondTime()
let (bounds1, itemSet1) = randomSetRange(count, time, rng)
let (bounds2, itemSet2) = randomSetRange(count, time + 10_000_000_000, rng)
let (bounds3, itemSet3) = randomSetRange(count, time + 20_000_000_000, rng)
let (bounds4, itemSet4) = randomSetRange(count, time + 30_000_000_000, rng)
let (bounds2, itemSet2) = randomSetRange(count, time + 11_000_000, rng)
let (bounds3, itemSet3) = randomSetRange(count, time + 21_000_000, rng)
let (bounds4, itemSet4) = randomSetRange(count, time + 31_000_000, rng)
let range1 = (bounds1, RangeType.ItemSet)
let range2 = (bounds2, RangeType.ItemSet)
@ -128,12 +127,12 @@ suite "Waku Store Sync Codec":
ranges = newSeqOfCap[(Slice[SyncID], RangeType)](4)
for i in 0 ..< count:
let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint)
let lb = SyncID(time: Timestamp(lastTime), hash: EmptyWakuMessageHash)
let nowTime = lastTime + 10_000_000_000 # 10s
lastTime = nowTime
let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint)
let ub = SyncID(time: Timestamp(nowTime), hash: EmptyWakuMessageHash)
let bounds = lb .. ub
let range = (bounds, RangeType.Fingerprint)
@ -171,10 +170,10 @@ suite "Waku Store Sync Codec":
fingerprints = newSeqOfCap[Fingerprint](4)
for i in 1 .. count:
let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint)
let lb = SyncID(time: Timestamp(lastTime), hash: EmptyWakuMessageHash)
let nowTime = lastTime + 10_000_000_000 # 10s
lastTime = nowTime
let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint)
let ub = SyncID(time: Timestamp(nowTime), hash: EmptyWakuMessageHash)
let bounds = lb .. ub
let range = (bounds, RangeType.Fingerprint)

View File

@ -0,0 +1,204 @@
{.used.}
import std/[options, random], testutils/unittests, chronos
import
../../waku/waku_core,
../../waku/waku_core/message/digest,
../../waku/waku_store_sync/common,
../../waku/waku_store_sync/storage/seq_storage,
./sync_utils
suite "Waku Sync Storage":
test "process hash range":
var rng = initRand()
let count = 10_000
var elements = newSeqOfCap[SyncID](count)
for i in 0 ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
var storage1 = SeqStorage.new(elements)
var storage2 = SeqStorage.new(elements)
let lb = elements[0]
let ub = elements[count - 1]
let bounds = lb .. ub
let fingerprint1 = storage1.computeFingerprint(bounds)
var outputPayload: RangesData
storage2.processFingerprintRange(bounds, fingerprint1, outputPayload)
let expected =
RangesData(ranges: @[(bounds, RangeType.Skip)], fingerprints: @[], itemSets: @[])
check:
outputPayload == expected
test "process item set range":
var rng = initRand()
let count = 1000
var elements1 = newSeqOfCap[SyncID](count)
var elements2 = newSeqOfCap[SyncID](count)
var diffs: seq[Fingerprint]
for i in 0 ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements1.add(id)
if rng.rand(0 .. 9) == 0:
elements2.add(id)
else:
diffs.add(id.hash)
var storage1 = SeqStorage.new(elements1)
let lb = elements1[0]
let ub = elements1[count - 1]
let bounds = lb .. ub
let itemSet2 = ItemSet(elements: elements2, reconciled: true)
var
toSend: seq[Fingerprint]
toRecv: seq[Fingerprint]
outputPayload: RangesData
storage1.processItemSetRange(bounds, itemSet2, toSend, toRecv, outputPayload)
check:
toSend == diffs
test "insert new element":
var rng = initRand()
let storage = SeqStorage.new(10)
let element1 = SyncID(time: Timestamp(1000), hash: randomHash(rng))
let element2 = SyncID(time: Timestamp(2000), hash: randomHash(rng))
let res1 = storage.insert(element1)
assert res1.isOk(), $res1.error
let count1 = storage.length()
let res2 = storage.insert(element2)
assert res2.isOk(), $res2.error
let count2 = storage.length()
check:
count1 == 1
count2 == 2
test "insert duplicate":
var rng = initRand()
let element = SyncID(time: Timestamp(1000), hash: randomHash(rng))
let storage = SeqStorage.new(@[element])
let res = storage.insert(element)
check:
res.isErr() == true
test "prune elements":
var rng = initRand()
let count = 1000
var elements = newSeqOfCap[SyncID](count)
for i in 0 ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
let storage = SeqStorage.new(elements)
let beforeCount = storage.length()
let pruned = storage.prune(Timestamp(500))
let afterCount = storage.length()
check:
beforeCount == 1000
pruned == 500
afterCount == 500
## disabled tests are rough benchmark
#[ test "10M fingerprint":
var rng = initRand()
let count = 10_000_000
var elements = newSeqOfCap[SyncID](count)
for i in 0 .. count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
let storage = SeqStorage.new(elements)
let before = getMonoTime()
discard storage.fingerprinting(some(0 .. count))
let after = getMonoTime()
echo "Fingerprint Time: " & $(after - before) ]#
#[ test "random inserts":
var rng = initRand()
let count = 10_000_000
var elements = newSeqOfCap[SyncID](count)
for i in 0 .. count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
var storage = SeqStorage.new(elements)
var avg: times.Duration
for i in 0 ..< 1000:
let newId =
SyncID(time: Timestamp(rng.rand(0 .. count)), hash: randomHash(rng))
let before = getMonoTime()
discard storage.insert(newId)
let after = getMonoTime()
avg += after - before
avg = avg div 1000
echo "Avg Time 1K Inserts: " & $avg ]#
#[ test "trim":
var rng = initRand()
let count = 10_000_000
var elements = newSeqOfCap[SyncID](count)
for i in 0 .. count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))
elements.add(id)
var storage = SeqStorage.new(elements)
let before = getMonoTime()
discard storage.trim(Timestamp(count div 4))
let after = getMonoTime()
echo "Trim Time: " & $(after - before) ]#

View File

@ -111,16 +111,10 @@ proc deltaEncode*(value: RangesData): seq[byte] =
return output
proc getItemSet(idx: var int, buffer: seq[byte], itemSetLength: int): ItemSet =
var itemSet = ItemSet()
let slice = buffer[idx ..< buffer.len]
let count = deltaDecode(itemSet, slice, itemSetLength)
idx += count
proc getItemSetLength(idx: var int, buffer: seq[byte]): int =
let min = min(idx + VarIntLen, buffer.len)
let slice = buffer[idx ..< min]
(val, len) = uint64.fromBytes(slice, Leb128)
let (val, len) = uint64.fromBytes(slice, Leb128)
idx += len
return int(val)
@ -141,7 +135,12 @@ proc getRangeType(idx: var int, buffer: seq[byte]): Result[RangeType, string] =
if idx >= buffer.len:
return err("Cannot decode range type")
let rangeType = RangeType(buffer[idx])
let val = buffer[idx]
if val > 2 or val < 0:
return err("Cannot decode range type")
let rangeType = RangeType(val)
idx += 1
return ok(rangeType)
@ -151,6 +150,10 @@ proc updateHash(idx: var int, buffer: seq[byte], hash: var WakuMessageHash) =
return
let sameBytes = int(buffer[idx])
if sameBytes > 32:
return
idx += 1
if idx + sameBytes > buffer.len:
@ -165,7 +168,7 @@ proc updateHash(idx: var int, buffer: seq[byte], hash: var WakuMessageHash) =
proc getTimeDiff(idx: var int, buffer: seq[byte]): Timestamp =
let min = min(idx + VarIntLen, buffer.len)
let slice = buffer[idx ..< min]
(val, len) = uint64.fromBytes(slice, Leb128)
let (val, len) = uint64.fromBytes(slice, Leb128)
idx += len
return Timestamp(val)
@ -175,7 +178,7 @@ proc getTimestamp(idx: var int, buffer: seq[byte]): Result[Timestamp, string] =
return err("Cannot decode timestamp")
let slice = buffer[idx ..< idx + VarIntLen]
(val, len) = uint64.fromBytes(slice, Leb128)
let (val, len) = uint64.fromBytes(slice, Leb128)
idx += len
return ok(Timestamp(val))
@ -196,7 +199,12 @@ proc getReconciled(idx: var int, buffer: seq[byte]): Result[bool, string] =
if idx >= buffer.len:
return err("Cannot decode reconciled")
let recon = bool(buffer[idx])
let val = buffer[idx]
if val > 1 or val < 0:
return err("Cannot decode reconciled")
let recon = bool(val)
idx += 1
return ok(recon)
@ -221,11 +229,21 @@ proc deltaDecode*(
itemSet.reconciled = ?getReconciled(idx, buffer)
return idx
return ok(idx)
proc getItemSet(
idx: var int, buffer: seq[byte], itemSetLength: int
): Result[ItemSet, string] =
var itemSet = ItemSet()
let slice = buffer[idx ..< buffer.len]
let count = ?deltaDecode(itemSet, slice, itemSetLength)
idx += count
return ok(itemSet)
proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] =
if buffer.len == 1:
return RangesData()
return err("payload too small")
var
payload = RangesData()
@ -260,10 +278,10 @@ proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] =
payload.fingerprints.add(fingerprint)
elif rangeType == RangeType.ItemSet:
let itemSetLength = getItemSetLength(idx, buffer)
let itemSet = getItemSet(idx, buffer, itemSetLength)
let itemSet = ?getItemSet(idx, buffer, itemSetLength)
payload.itemSets.add(itemSet)
return payload
return ok(payload)
proc decode*(T: type WakuMessageAndTopic, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)

View File

@ -0,0 +1,55 @@
import chronos
import ../../waku_core/time, ../common
proc calculateTimeRange*(
jitter: Duration = 20.seconds, syncRange: Duration = 1.hours
): Slice[Timestamp] =
## Calculates the start and end time of a sync session
var now = getNowInNanosecondTime()
# Because of message jitter inherent to Relay protocol
now -= jitter.nanos
let syncRange = syncRange.nanos
let syncStart = now - syncRange
let syncEnd = now
return Timestamp(syncStart) .. Timestamp(syncEnd)
proc equalPartitioning*(slice: Slice[SyncID], count: int): seq[Slice[SyncID]] =
## Partition into N time slices.
## Remainder is distributed equaly to the first slices.
let totalLength: int64 = slice.b.time - slice.a.time
if totalLength < count:
return @[]
let parts = totalLength div count
var rem = totalLength mod count
var bounds = newSeqOfCap[Slice[SyncID]](count)
var lb = slice.a.time
for i in 0 ..< count:
var ub = lb + parts
if rem > 0:
ub += 1
rem -= 1
let lower = SyncID(time: lb, hash: EmptyFingerprint)
let upper = SyncID(time: ub, hash: EmptyFingerprint)
let bound = lower .. upper
bounds.add(bound)
lb = ub
return bounds
#TODO implement exponential partitioning

View File

@ -0,0 +1,299 @@
import std/[algorithm, sequtils, math, options], results, chronos, stew/arrayops
import
../../waku_core/time,
../../waku_core/message/digest,
../common,
./range_processing,
./storage
type SeqStorage* = ref object of SyncStorage
elements: seq[SyncID]
# Numer of parts a range will be splitted into.
partitionCount: int
# Number of element in a range for which item sets are used instead of fingerprints.
lengthThreshold: int
method length*(self: SeqStorage): int =
return self.elements.len
method insert*(self: SeqStorage, element: SyncID): Result[void, string] {.raises: [].} =
let idx = self.elements.lowerBound(element, common.cmp)
if idx < self.elements.len and self.elements[idx] == element:
return err("duplicate element")
self.elements.insert(element, idx)
return ok()
method batchInsert*(
self: SeqStorage, elements: seq[SyncID]
): Result[void, string] {.raises: [].} =
## Insert the sorted seq of new elements.
if elements.len == 1:
return self.insert(elements[0])
#TODO custom impl. ???
if not elements.isSorted(common.cmp):
return err("seq not sorted")
var merged = newSeqOfCap[SyncID](self.elements.len + elements.len)
merged.merge(self.elements, elements, common.cmp)
self.elements = merged.deduplicate(true)
return ok()
method prune*(self: SeqStorage, timestamp: Timestamp): int {.raises: [].} =
## Remove all elements before the timestamp.
## Returns # of elements pruned.
let bound = SyncID(time: timestamp, hash: EmptyWakuMessageHash)
let idx = self.elements.lowerBound(bound, common.cmp)
self.elements.delete(0 ..< idx)
return idx
proc computefingerprintFromSlice(
self: SeqStorage, sliceOpt: Option[Slice[int]]
): Fingerprint =
## XOR all hashes of a slice of the storage.
var fingerprint = EmptyFingerprint
if sliceOpt.isNone():
return fingerprint
let idxSlice = sliceOpt.get()
for id in self.elements[idxSlice]:
fingerprint = fingerprint xor id.hash
return fingerprint
proc findIdxBounds(self: SeqStorage, slice: Slice[SyncID]): Option[Slice[int]] =
## Given bounds find the corresponding indices in this storage
#TODO can thoses 2 binary search be combined for efficiency ???
let lower = self.elements.lowerBound(slice.a, common.cmp)
var upper = self.elements.upperBound(slice.b, common.cmp)
if upper < 1:
# entire range is before any of our elements
return none(Slice[int])
if lower >= self.elements.len:
# entire range is after any of our elements
return none(Slice[int])
return some(lower ..< upper)
method computeFingerprint*(
self: SeqStorage, bounds: Slice[SyncID]
): Fingerprint {.raises: [].} =
let idxSliceOpt = self.findIdxBounds(bounds)
return self.computefingerprintFromSlice(idxSliceOpt)
proc processFingerprintRange*(
self: SeqStorage,
inputBounds: Slice[SyncID],
inputFingerprint: Fingerprint,
output: var RangesData,
) {.raises: [].} =
## Compares fingerprints and partition new ranges.
let idxSlice = self.findIdxBounds(inputBounds)
let ourFingerprint = self.computeFingerprintFromSlice(idxSlice)
if ourFingerprint == inputFingerprint:
output.ranges.add((inputBounds, RangeType.Skip))
return
if idxSlice.isNone():
output.ranges.add((inputBounds, RangeType.ItemSet))
let state = ItemSet(elements: @[], reconciled: true)
output.itemSets.add(state)
return
let slice = idxSlice.get()
if slice.len <= self.lengthThreshold:
output.ranges.add((inputBounds, RangeType.ItemSet))
let state = ItemSet(elements: self.elements[slice], reconciled: false)
output.itemSets.add(state)
return
let partitions = equalPartitioning(inputBounds, self.partitionCount)
for partitionBounds in partitions:
let sliceOpt = self.findIdxBounds(partitionBounds)
if sliceOpt.isNone():
output.ranges.add((partitionBounds, RangeType.ItemSet))
let state = ItemSet(elements: @[], reconciled: true)
output.itemSets.add(state)
continue
let slice = sliceOpt.get()
if slice.len <= self.lengthThreshold:
output.ranges.add((partitionBounds, RangeType.ItemSet))
let state = ItemSet(elements: self.elements[slice], reconciled: false)
output.itemSets.add(state)
continue
let fingerprint = self.computeFingerprintFromSlice(some(slice))
output.ranges.add((partitionBounds, RangeType.Fingerprint))
output.fingerprints.add(fingerprint)
continue
proc processItemSetRange*(
self: SeqStorage,
inputBounds: Slice[SyncID],
inputItemSet: ItemSet,
hashToSend: var seq[Fingerprint],
hashToRecv: var seq[Fingerprint],
output: var RangesData,
) {.raises: [].} =
## Compares item sets and outputs differences
let idxSlice = self.findIdxBounds(inputBounds)
if idxSlice.isNone():
if not inputItemSet.reconciled:
output.ranges.add((inputBounds, RangeType.ItemSet))
let state = ItemSet(elements: @[], reconciled: true)
output.itemSets.add(state)
else:
output.ranges.add((inputBounds, RangeType.Skip))
return
let slice = idxSlice.get()
var i = 0
let n = inputItemSet.elements.len
var j = slice.a
let m = slice.b + 1
while (j < m):
let ourElement = self.elements[j]
if i >= n:
# in case we have more elements
hashToSend.add(ourElement.hash)
j.inc()
continue
let theirElement = inputItemSet.elements[i]
if theirElement < ourElement:
hashToRecv.add(theirElement.hash)
i.inc()
elif theirElement > ourElement:
hashToSend.add(ourElement.hash)
j.inc()
else:
i.inc()
j.inc()
while (i < n):
# in case they have more elements
let theirElement = inputItemSet.elements[i]
i.inc()
hashToRecv.add(theirElement.hash)
if not inputItemSet.reconciled:
output.ranges.add((inputBounds, RangeType.ItemSet))
let state = ItemSet(elements: self.elements[slice], reconciled: true)
output.itemSets.add(state)
else:
output.ranges.add((inputBounds, RangeType.Skip))
method processPayload*(
self: SeqStorage,
input: RangesData,
hashToSend: var seq[Fingerprint],
hashToRecv: var seq[Fingerprint],
): RangesData {.raises: [].} =
var output = RangesData()
var
i = 0
j = 0
for (bounds, rangeType) in input.ranges:
case rangeType
of RangeType.Skip:
output.ranges.add((bounds, RangeType.Skip))
continue
of RangeType.Fingerprint:
let fingerprint = input.fingerprints[i]
i.inc()
self.processFingerprintRange(bounds, fingerprint, output)
continue
of RangeType.ItemSet:
let itemSet = input.itemsets[j]
j.inc()
self.processItemSetRange(bounds, itemSet, hashToSend, hashToRecv, output)
continue
# merge consecutive skip ranges
var allSkip = true
i = output.ranges.len - 1
while i >= 0:
let currRange = output.ranges[i]
if allSkip and currRange[1] != RangeType.Skip:
allSkip = false
if i <= 0:
break
let prevRange = output.ranges[i - 1]
if currRange[1] != RangeType.Skip or prevRange[1] != RangeType.Skip:
i.dec()
continue
let lb = prevRange[0].a
let ub = currRange[0].b
let newRange = (lb .. ub, RangeType.Skip)
output.ranges.delete(i)
output.ranges[i - 1] = newRange
i.dec()
if allSkip:
output = RangesData()
return output
proc new*(T: type SeqStorage, capacity: int, threshold = 100, partitions = 8): T =
return SeqStorage(
elements: newSeqOfCap[SyncID](capacity),
lengthThreshold: threshold,
partitionCount: partitions,
)
proc new*(
T: type SeqStorage, elements: seq[SyncID], threshold = 100, partitions = 8
): T =
return SeqStorage(
elements: elements, lengthThreshold: threshold, partitionCount: partitions
)

View File

@ -0,0 +1,36 @@
import results
import ../../waku_core/time, ../common
type SyncStorage* = ref object of RootObj
method insert*(
self: SyncStorage, element: SyncID
): Result[void, string] {.base, gcsafe, raises: [].} =
return err("insert method not implemented for SyncStorage")
method batchInsert*(
self: SyncStorage, elements: seq[SyncID]
): Result[void, string] {.base, gcsafe, raises: [].} =
return err("batchInsert method not implemented for SyncStorage")
method prune*(
self: SyncStorage, timestamp: Timestamp
): int {.base, gcsafe, raises: [].} =
-1
method computeFingerprint*(
self: SyncStorage, bounds: Slice[SyncID]
): Fingerprint {.base, gcsafe, raises: [].} =
return EmptyFingerprint
method processPayload*(
self: SyncStorage,
payload: RangesData,
hashToSend: var seq[Fingerprint],
hashToRecv: var seq[Fingerprint],
): RangesData {.base, gcsafe, raises: [].} =
return RangesData()
method length*(self: SyncStorage): int {.base, gcsafe, raises: [].} =
-1