feat: waku store sync 2.0 common types & codec (#3213)

This commit is contained in:
Simon-Pierre Vivier 2025-01-22 11:08:23 -05:00 committed by GitHub
parent fdfc48c923
commit c1b9257948
6 changed files with 636 additions and 0 deletions

View File

@ -0,0 +1,58 @@
import std/[options, random], chronos, chronicles
import waku/[node/peer_manager, waku_core, waku_store_sync], ../testlib/wakucore
randomize()
proc randomHash*(rng: var Rand): WakuMessageHash =
var hash = EmptyWakuMessageHash
for i in 0 ..< hash.len:
hash[i] = rng.rand(uint8)
return hash
proc newTestWakuRecon*(
switch: Switch,
idsRx: AsyncQueue[SyncID],
wantsTx: AsyncQueue[(PeerId, Fingerprint)],
needsTx: AsyncQueue[(PeerId, Fingerprint)],
): Future[SyncReconciliation] {.async.} =
let peerManager = PeerManager.new(switch)
let res = await SyncReconciliation.new(
peerManager = peerManager,
wakuArchive = nil,
relayJitter = 0.seconds,
idsRx = idsRx,
wantsTx = wantsTx,
needsTx = needsTx,
)
let proto = res.get()
proto.start()
switch.mount(proto)
return proto
proc newTestWakuTransfer*(
switch: Switch,
idsTx: AsyncQueue[SyncID],
wantsRx: AsyncQueue[(PeerId, Fingerprint)],
needsRx: AsyncQueue[(PeerId, Fingerprint)],
): SyncTransfer =
let peerManager = PeerManager.new(switch)
let proto = SyncTransfer.new(
peerManager = peerManager,
wakuArchive = nil,
idsTx = idsTx,
wantsRx = wantsRx,
needsRx = needsRx,
)
proto.start()
switch.mount(proto)
return proto

View File

@ -0,0 +1,206 @@
{.used.}
import std/[options, random], testutils/unittests, chronos
import
../../waku/waku_core,
../../waku/waku_core/message/digest,
../../waku/waku_core/time,
../../waku/waku_store_sync,
../../waku/waku_store_sync/common,
../../waku/waku_store_sync/codec,
./sync_utils
proc randomItemSet(count: int, startTime: Timestamp, rng: var Rand): ItemSet =
var
elements = newSeqOfCap[SyncID](count)
lastTime = startTime
for i in 0 ..< count:
let diff = rng.rand(9.uint8) + 1
let timestamp = lastTime + diff * 1_000_000_000
lastTime = timestamp
let hash = randomHash(rng)
let id = SyncID(time: Timestamp(timestamp), fingerprint: hash)
elements.add(id)
return ItemSet(elements: elements, reconciled: true)
proc randomSetRange(
count: int, startTime: Timestamp, rng: var Rand
): (Slice[SyncID], ItemSet) =
let itemSet = randomItemSet(count, startTime, rng)
var
lb = itemSet.elements[0]
ub = itemSet.elements[^1]
#for test check equality
lb.fingerprint = EmptyFingerprint
ub.fingerprint = EmptyFingerprint
let bounds = lb .. ub
return (bounds, itemSet)
suite "Waku Store Sync Codec":
test "empty item set encoding roundtrip":
var origItemSet = ItemSet()
origItemSet.reconciled = true
var encodedSet = origItemSet.deltaEncode()
var itemSet = ItemSet()
let _ = deltaDecode(itemSet, encodedSet, 0)
check:
origItemSet == itemSet
test "item set encoding roundtrip":
let
count = 10
time = getNowInNanosecondTime()
var rng = initRand()
let origItemSet = randomItemSet(count, time, rng)
var encodedSet = origItemSet.deltaEncode()
#faking a longer payload
let pad: seq[byte] =
@[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
encodedSet &= pad
var itemSet = ItemSet()
let _ = deltaDecode(itemSet, encodedSet, count)
check:
origItemSet == itemSet
test "payload item set encoding roundtrip":
let count = 5
var
rng = initRand()
time = getNowInNanosecondTime()
let (bounds1, itemSet1) = randomSetRange(count, time, rng)
let (bounds2, itemSet2) = randomSetRange(count, time + 10_000_000_000, rng)
let (bounds3, itemSet3) = randomSetRange(count, time + 20_000_000_000, rng)
let (bounds4, itemSet4) = randomSetRange(count, time + 30_000_000_000, rng)
let range1 = (bounds1, RangeType.ItemSet)
let range2 = (bounds2, RangeType.ItemSet)
let range3 = (bounds3, RangeType.ItemSet)
let range4 = (bounds4, RangeType.ItemSet)
let payload = RangesData(
ranges: @[range1, range2, range3, range4],
fingerprints: @[],
itemSets: @[itemSet1, itemSet2, itemSet3, itemSet4],
)
let encodedPayload = payload.deltaEncode()
let res = RangesData.deltaDecode(encodedPayload)
assert res.isOk(), $res.error
let decodedPayload = res.get()
check:
payload.ranges[0][0].b == decodedPayload.ranges[0][0].b
payload.ranges[1][0].b == decodedPayload.ranges[1][0].b
payload.ranges[2][0].b == decodedPayload.ranges[2][0].b
payload.ranges[3][0].b == decodedPayload.ranges[3][0].b
payload.itemSets == decodedPayload.itemSets
test "payload fingerprint encoding roundtrip":
let count = 4
var
rng = initRand()
lastTime = getNowInNanosecondTime()
ranges = newSeqOfCap[(Slice[SyncID], RangeType)](4)
for i in 0 ..< count:
let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint)
let nowTime = lastTime + 10_000_000_000 # 10s
lastTime = nowTime
let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint)
let bounds = lb .. ub
let range = (bounds, RangeType.Fingerprint)
ranges.add(range)
let payload = RangesData(
ranges: ranges,
fingerprints:
@[randomHash(rng), randomHash(rng), randomHash(rng), randomHash(rng)],
itemSets: @[],
)
let encodedPayload = payload.deltaEncode()
let res = RangesData.deltaDecode(encodedPayload)
assert res.isOk(), $res.error
let decodedPayload = res.get()
check:
payload.ranges[0][0].b == decodedPayload.ranges[0][0].b
payload.ranges[1][0].b == decodedPayload.ranges[1][0].b
payload.ranges[2][0].b == decodedPayload.ranges[2][0].b
payload.ranges[3][0].b == decodedPayload.ranges[3][0].b
payload.fingerprints == decodedPayload.fingerprints
test "payload mixed encoding roundtrip":
let count = 2
var
rng = initRand()
lastTime = getNowInNanosecondTime()
ranges = newSeqOfCap[(Slice[SyncID], RangeType)](4)
itemSets = newSeqOfCap[ItemSet](4)
fingerprints = newSeqOfCap[Fingerprint](4)
for i in 1 .. count:
let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint)
let nowTime = lastTime + 10_000_000_000 # 10s
lastTime = nowTime
let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint)
let bounds = lb .. ub
let range = (bounds, RangeType.Fingerprint)
ranges.add(range)
fingerprints.add(randomHash(rng))
let (bound, itemSet) = randomSetRange(5, lastTime, rng)
lastTime += 50_000_000_000 # 50s
ranges.add((bound, RangeType.ItemSet))
itemSets.add(itemSet)
let payload =
RangesData(ranges: ranges, fingerprints: fingerprints, itemSets: itemSets)
let encodedPayload = payload.deltaEncode()
let res = RangesData.deltaDecode(encodedPayload)
assert res.isOk(), $res.error
let decodedPayload = res.get()
check:
payload.ranges[0][0].b == decodedPayload.ranges[0][0].b
payload.ranges[1][0].b == decodedPayload.ranges[1][0].b
payload.ranges[2][0].b == decodedPayload.ranges[2][0].b
payload.ranges[3][0].b == decodedPayload.ranges[3][0].b
payload.fingerprints == decodedPayload.fingerprints
payload.itemSets == decodedPayload.itemSets

View File

@ -5,6 +5,8 @@ const
WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1"
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
WakuSyncCodec* = "/vac/waku/sync/1.0.0"
WakuReconciliationCodec* = "/vac/waku/reconciliation/1.0.0"
WakuTransferCodec* = "/vac/waku/transfer/1.0.0"
WakuMetadataCodec* = "/vac/waku/metadata/1.0.0"
WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1"
WakuLegacyStoreCodec* = "/vac/waku/store/2.0.0-beta4"

View File

@ -48,3 +48,11 @@ proc computeMessageHash*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessag
ctx.update(toBytesBE(uint64(msg.timestamp)))
return ctx.finish() # Computes the hash
proc cmp*(x, y: WakuMessageHash): int =
if x < y:
return -1
elif x == y:
return 0
return 1

View File

@ -0,0 +1,281 @@
{.push raises: [].}
import std/sequtils, stew/leb128
import ../common/protobuf, ../waku_core/message, ../waku_core/time, ./common
const
HashLen = 32
VarIntLen = 9
AvgCapacity = 1000
proc encode*(value: WakuMessageAndTopic): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, value.pubsub)
pb.write3(2, value.message.encode())
return pb
proc deltaEncode*(itemSet: ItemSet): seq[byte] =
# 1 byte for resolved bool and 32 bytes hash plus 9 bytes varint per elements
let capacity = 1 + (itemSet.elements.len * (VarIntLen + HashLen))
var
output = newSeqOfCap[byte](capacity)
lastTime = Timestamp(0)
buf = Leb128Buf[uint64]()
for id in itemSet.elements:
let timeDiff = uint64(id.time) - uint64(lastTime)
lastTime = id.time
# encode timestamp
buf = timediff.toBytes(Leb128)
output &= @buf
output &= id.hash
output &= byte(itemSet.reconciled)
return output
proc deltaEncode*(value: RangesData): seq[byte] =
if value.ranges.len == 0:
return @[0]
var
output = newSeqOfCap[byte](AvgCapacity)
buf = Leb128Buf[uint64]()
lastTimestamp: Timestamp
lastHash: Fingerprint
i = 0
j = 0
# the first range is implicit but must be explicit when encoded
let (bound, _) = value.ranges[0]
lastTimestamp = bound.a.time
lastHash = bound.a.hash
# encode first timestamp
buf = uint64(lastTimestamp).toBytes(Leb128)
output &= @buf
# implicit first hash is always 0 and range type is always skip
for (bound, rangeType) in value.ranges:
let timeDiff = uint64(bound.b.time) - uint64(lastTimestamp)
lastTimestamp = bound.b.time
# encode timestamp
buf = timeDiff.toBytes(Leb128)
output &= @buf
if timeDiff == 0:
var sameBytes = 0
for (byte1, byte2) in zip(lastHash, bound.b.hash):
sameBytes.inc()
if byte1 != byte2:
break
# encode number of same bytes
output &= byte(sameBytes)
# encode hash bytes
output &= bound.b.hash[0 ..< sameBytes]
# encode rangeType
output &= byte(rangeType)
case rangeType
of RangeType.Skip:
continue
of RangeType.Fingerprint:
output &= value.fingerprints[i]
i.inc()
of RangeType.ItemSet:
let itemSet = value.itemSets[j]
j.inc()
# encode how many elements are in the set
buf = uint64(itemSet.elements.len).toBytes(Leb128)
output &= @buf
let encodedSet = itemSet.deltaEncode()
output &= encodedSet
continue
return output
proc getItemSet(idx: var int, buffer: seq[byte], itemSetLength: int): ItemSet =
var itemSet = ItemSet()
let slice = buffer[idx ..< buffer.len]
let count = deltaDecode(itemSet, slice, itemSetLength)
idx += count
proc getItemSetLength(idx: var int, buffer: seq[byte]): int =
let min = min(idx + VarIntLen, buffer.len)
let slice = buffer[idx ..< min]
(val, len) = uint64.fromBytes(slice, Leb128)
idx += len
return int(val)
proc getFingerprint(idx: var int, buffer: seq[byte]): Result[Fingerprint, string] =
if idx + HashLen > buffer.len:
return err("Cannot decode fingerprint")
let slice = buffer[idx ..< idx + HashLen]
idx += HashLen
var fingerprint = EmptyFingerprint
for i, bytes in slice:
fingerprint[i] = bytes
return ok(fingerprint)
proc getRangeType(idx: var int, buffer: seq[byte]): Result[RangeType, string] =
if idx >= buffer.len:
return err("Cannot decode range type")
let rangeType = RangeType(buffer[idx])
idx += 1
return ok(rangeType)
proc updateHash(idx: var int, buffer: seq[byte], hash: var WakuMessageHash) =
if idx >= buffer.len:
return
let sameBytes = int(buffer[idx])
idx += 1
if idx + sameBytes > buffer.len:
return
let slice = buffer[idx ..< idx + sameBytes]
idx += sameBytes
for i, bytes in slice:
hash[i] = bytes
proc getTimeDiff(idx: var int, buffer: seq[byte]): Timestamp =
let min = min(idx + VarIntLen, buffer.len)
let slice = buffer[idx ..< min]
(val, len) = uint64.fromBytes(slice, Leb128)
idx += len
return Timestamp(val)
proc getTimestamp(idx: var int, buffer: seq[byte]): Result[Timestamp, string] =
if idx + VarIntLen > buffer.len:
return err("Cannot decode timestamp")
let slice = buffer[idx ..< idx + VarIntLen]
(val, len) = uint64.fromBytes(slice, Leb128)
idx += len
return ok(Timestamp(val))
proc getHash(idx: var int, buffer: seq[byte]): Result[WakuMessageHash, string] =
if idx + HashLen > buffer.len:
return err("Cannot decode hash")
let slice = buffer[idx ..< idx + HashLen]
idx += HashLen
var hash = EmptyWakuMessageHash
for i, bytes in slice:
hash[i] = bytes
return ok(hash)
proc getReconciled(idx: var int, buffer: seq[byte]): Result[bool, string] =
if idx >= buffer.len:
return err("Cannot decode reconciled")
let recon = bool(buffer[idx])
idx += 1
return ok(recon)
proc deltaDecode*(
itemSet: var ItemSet, buffer: seq[byte], setLength: int
): Result[int, string] =
var
lastTime = Timestamp(0)
idx = 0
while itemSet.elements.len < setLength:
let timeDiff = ?getTimestamp(idx, buffer)
let time = lastTime + timeDiff
lastTime = time
let hash = ?getHash(idx, buffer)
let id = SyncID(time: time, hash: hash)
itemSet.elements.add(id)
itemSet.reconciled = ?getReconciled(idx, buffer)
return idx
proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] =
if buffer.len == 1:
return RangesData()
var
payload = RangesData()
lastTime = Timestamp(0)
idx = 0
lastTime = ?getTimestamp(idx, buffer)
# implicit first hash is always 0
# implicit first range mode is alway skip
while idx < buffer.len - 1:
let lowerRangeBound = SyncID(time: lastTime, hash: EmptyWakuMessageHash)
let timeDiff = getTimeDiff(idx, buffer)
var hash = EmptyWakuMessageHash
if timeDiff == 0:
updateHash(idx, buffer, hash)
let thisTime = lastTime + timeDiff
lastTime = thisTime
let upperRangeBound = SyncID(time: thisTime, hash: hash)
let bounds = lowerRangeBound .. upperRangeBound
let rangeType = ?getRangeType(idx, buffer)
payload.ranges.add((bounds, rangeType))
if rangeType == RangeType.Fingerprint:
let fingerprint = ?getFingerprint(idx, buffer)
payload.fingerprints.add(fingerprint)
elif rangeType == RangeType.ItemSet:
let itemSetLength = getItemSetLength(idx, buffer)
let itemSet = getItemSet(idx, buffer, itemSetLength)
payload.itemSets.add(itemSet)
return payload
proc decode*(T: type WakuMessageAndTopic, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var pubsub: string
if not ?pb.getField(1, pubsub):
return err(ProtobufError.missingRequiredField("pubsub"))
var proto: ProtoBuffer
if not ?pb.getField(2, proto):
return err(ProtobufError.missingRequiredField("msg"))
let message = ?WakuMessage.decode(proto.buffer)
return ok(WakuMessageAndTopic(pubsub: pubsub, message: message))

View File

@ -0,0 +1,81 @@
{.push raises: [].}
import std/[options], chronos, stew/[byteutils]
import ../waku_core
const
DefaultSyncInterval*: Duration = 5.minutes
DefaultSyncRange*: Duration = 1.hours
DefaultGossipSubJitter*: Duration = 20.seconds
type
Fingerprint* = array[32, byte]
SyncID* = object
time*: Timestamp
hash*: WakuMessageHash
ItemSet* = object
elements*: seq[SyncID]
reconciled*: bool
RangeType* {.pure.} = enum
Skip = 0
Fingerprint = 1
ItemSet = 2
RangesData* = object
ranges*: seq[(Slice[SyncID], RangeType)]
fingerprints*: seq[Fingerprint] # Range type fingerprint stored here in order
itemSets*: seq[ItemSet] # Range type itemset stored here in order
WakuMessageAndTopic* = object
pubsub*: PubSubTopic
message*: WakuMessage
const EmptyFingerprint*: Fingerprint = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
]
const FullFingerprint*: Fingerprint = [
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
]
proc high*(T: type SyncID): T =
## Same as high(int) but for IDs
return SyncID(time: Timestamp(high(int64)), fingerprint: FullFingerprint)
proc low*(T: type SyncID): T =
## Same as low(int) but for IDs
return SyncID(time: Timestamp(low(int64)), fingerprint: EmptyFingerprint)
proc `$`*(value: SyncID): string =
return '(' & $value.time & ", " & $value.hash & ')'
proc cmp(x, y: Fingerprint): int =
if x < y:
return -1
elif x == y:
return 0
return 1
proc cmp*(x, y: SyncID): int =
if x.time == y.time:
return cmp(x.hash, y.hash)
if x.time < y.time:
return -1
return 1
proc `<`*(x, y: SyncID): bool =
cmp(x, y) == -1
proc `>`*(x, y: SyncID): bool =
cmp(x, y) == 1