From c1b9257948bcbe73fa3df7550d6d18807c7dcc57 Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Wed, 22 Jan 2025 11:08:23 -0500 Subject: [PATCH] feat: waku store sync 2.0 common types & codec (#3213) --- tests/waku_store_sync/sync_utils.nim | 58 ++++++ tests/waku_store_sync/test_codec.nim | 206 ++++++++++++++++++++ waku/waku_core/codecs.nim | 2 + waku/waku_core/message/digest.nim | 8 + waku/waku_store_sync/codec.nim | 281 +++++++++++++++++++++++++++ waku/waku_store_sync/common.nim | 81 ++++++++ 6 files changed, 636 insertions(+) create mode 100644 tests/waku_store_sync/sync_utils.nim create mode 100644 tests/waku_store_sync/test_codec.nim create mode 100644 waku/waku_store_sync/codec.nim create mode 100644 waku/waku_store_sync/common.nim diff --git a/tests/waku_store_sync/sync_utils.nim b/tests/waku_store_sync/sync_utils.nim new file mode 100644 index 000000000..e32d0fefa --- /dev/null +++ b/tests/waku_store_sync/sync_utils.nim @@ -0,0 +1,58 @@ +import std/[options, random], chronos, chronicles + +import waku/[node/peer_manager, waku_core, waku_store_sync], ../testlib/wakucore + +randomize() + +proc randomHash*(rng: var Rand): WakuMessageHash = + var hash = EmptyWakuMessageHash + + for i in 0 ..< hash.len: + hash[i] = rng.rand(uint8) + + return hash + +proc newTestWakuRecon*( + switch: Switch, + idsRx: AsyncQueue[SyncID], + wantsTx: AsyncQueue[(PeerId, Fingerprint)], + needsTx: AsyncQueue[(PeerId, Fingerprint)], +): Future[SyncReconciliation] {.async.} = + let peerManager = PeerManager.new(switch) + + let res = await SyncReconciliation.new( + peerManager = peerManager, + wakuArchive = nil, + relayJitter = 0.seconds, + idsRx = idsRx, + wantsTx = wantsTx, + needsTx = needsTx, + ) + + let proto = res.get() + + proto.start() + switch.mount(proto) + + return proto + +proc newTestWakuTransfer*( + switch: Switch, + idsTx: AsyncQueue[SyncID], + wantsRx: AsyncQueue[(PeerId, Fingerprint)], + needsRx: AsyncQueue[(PeerId, Fingerprint)], +): SyncTransfer = + let peerManager = PeerManager.new(switch) + + let proto = SyncTransfer.new( + peerManager = peerManager, + wakuArchive = nil, + idsTx = idsTx, + wantsRx = wantsRx, + needsRx = needsRx, + ) + + proto.start() + switch.mount(proto) + + return proto diff --git a/tests/waku_store_sync/test_codec.nim b/tests/waku_store_sync/test_codec.nim new file mode 100644 index 000000000..9e7a85924 --- /dev/null +++ b/tests/waku_store_sync/test_codec.nim @@ -0,0 +1,206 @@ +{.used.} + +import std/[options, random], testutils/unittests, chronos + +import + ../../waku/waku_core, + ../../waku/waku_core/message/digest, + ../../waku/waku_core/time, + ../../waku/waku_store_sync, + ../../waku/waku_store_sync/common, + ../../waku/waku_store_sync/codec, + ./sync_utils + +proc randomItemSet(count: int, startTime: Timestamp, rng: var Rand): ItemSet = + var + elements = newSeqOfCap[SyncID](count) + lastTime = startTime + + for i in 0 ..< count: + let diff = rng.rand(9.uint8) + 1 + + let timestamp = lastTime + diff * 1_000_000_000 + lastTime = timestamp + + let hash = randomHash(rng) + + let id = SyncID(time: Timestamp(timestamp), fingerprint: hash) + + elements.add(id) + + return ItemSet(elements: elements, reconciled: true) + +proc randomSetRange( + count: int, startTime: Timestamp, rng: var Rand +): (Slice[SyncID], ItemSet) = + let itemSet = randomItemSet(count, startTime, rng) + + var + lb = itemSet.elements[0] + ub = itemSet.elements[^1] + + #for test check equality + lb.fingerprint = EmptyFingerprint + ub.fingerprint = EmptyFingerprint + + let bounds = lb .. ub + + return (bounds, itemSet) + +suite "Waku Store Sync Codec": + test "empty item set encoding roundtrip": + var origItemSet = ItemSet() + + origItemSet.reconciled = true + + var encodedSet = origItemSet.deltaEncode() + + var itemSet = ItemSet() + let _ = deltaDecode(itemSet, encodedSet, 0) + + check: + origItemSet == itemSet + + test "item set encoding roundtrip": + let + count = 10 + time = getNowInNanosecondTime() + + var rng = initRand() + + let origItemSet = randomItemSet(count, time, rng) + var encodedSet = origItemSet.deltaEncode() + + #faking a longer payload + let pad: seq[byte] = + @[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + encodedSet &= pad + + var itemSet = ItemSet() + let _ = deltaDecode(itemSet, encodedSet, count) + + check: + origItemSet == itemSet + + test "payload item set encoding roundtrip": + let count = 5 + + var + rng = initRand() + time = getNowInNanosecondTime() + + let (bounds1, itemSet1) = randomSetRange(count, time, rng) + let (bounds2, itemSet2) = randomSetRange(count, time + 10_000_000_000, rng) + let (bounds3, itemSet3) = randomSetRange(count, time + 20_000_000_000, rng) + let (bounds4, itemSet4) = randomSetRange(count, time + 30_000_000_000, rng) + + let range1 = (bounds1, RangeType.ItemSet) + let range2 = (bounds2, RangeType.ItemSet) + let range3 = (bounds3, RangeType.ItemSet) + let range4 = (bounds4, RangeType.ItemSet) + + let payload = RangesData( + ranges: @[range1, range2, range3, range4], + fingerprints: @[], + itemSets: @[itemSet1, itemSet2, itemSet3, itemSet4], + ) + + let encodedPayload = payload.deltaEncode() + + let res = RangesData.deltaDecode(encodedPayload) + assert res.isOk(), $res.error + + let decodedPayload = res.get() + + check: + payload.ranges[0][0].b == decodedPayload.ranges[0][0].b + payload.ranges[1][0].b == decodedPayload.ranges[1][0].b + payload.ranges[2][0].b == decodedPayload.ranges[2][0].b + payload.ranges[3][0].b == decodedPayload.ranges[3][0].b + payload.itemSets == decodedPayload.itemSets + + test "payload fingerprint encoding roundtrip": + let count = 4 + + var + rng = initRand() + lastTime = getNowInNanosecondTime() + ranges = newSeqOfCap[(Slice[SyncID], RangeType)](4) + + for i in 0 ..< count: + let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint) + + let nowTime = lastTime + 10_000_000_000 # 10s + + lastTime = nowTime + let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) + let bounds = lb .. ub + let range = (bounds, RangeType.Fingerprint) + + ranges.add(range) + + let payload = RangesData( + ranges: ranges, + fingerprints: + @[randomHash(rng), randomHash(rng), randomHash(rng), randomHash(rng)], + itemSets: @[], + ) + + let encodedPayload = payload.deltaEncode() + + let res = RangesData.deltaDecode(encodedPayload) + assert res.isOk(), $res.error + + let decodedPayload = res.get() + + check: + payload.ranges[0][0].b == decodedPayload.ranges[0][0].b + payload.ranges[1][0].b == decodedPayload.ranges[1][0].b + payload.ranges[2][0].b == decodedPayload.ranges[2][0].b + payload.ranges[3][0].b == decodedPayload.ranges[3][0].b + payload.fingerprints == decodedPayload.fingerprints + + test "payload mixed encoding roundtrip": + let count = 2 + + var + rng = initRand() + lastTime = getNowInNanosecondTime() + ranges = newSeqOfCap[(Slice[SyncID], RangeType)](4) + itemSets = newSeqOfCap[ItemSet](4) + fingerprints = newSeqOfCap[Fingerprint](4) + + for i in 1 .. count: + let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint) + let nowTime = lastTime + 10_000_000_000 # 10s + lastTime = nowTime + let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) + let bounds = lb .. ub + let range = (bounds, RangeType.Fingerprint) + + ranges.add(range) + fingerprints.add(randomHash(rng)) + + let (bound, itemSet) = randomSetRange(5, lastTime, rng) + lastTime += 50_000_000_000 # 50s + + ranges.add((bound, RangeType.ItemSet)) + itemSets.add(itemSet) + + let payload = + RangesData(ranges: ranges, fingerprints: fingerprints, itemSets: itemSets) + + let encodedPayload = payload.deltaEncode() + + let res = RangesData.deltaDecode(encodedPayload) + assert res.isOk(), $res.error + + let decodedPayload = res.get() + + check: + payload.ranges[0][0].b == decodedPayload.ranges[0][0].b + payload.ranges[1][0].b == decodedPayload.ranges[1][0].b + payload.ranges[2][0].b == decodedPayload.ranges[2][0].b + payload.ranges[3][0].b == decodedPayload.ranges[3][0].b + payload.fingerprints == decodedPayload.fingerprints + payload.itemSets == decodedPayload.itemSets diff --git a/waku/waku_core/codecs.nim b/waku/waku_core/codecs.nim index 2b5b8f7b4..35a050b72 100644 --- a/waku/waku_core/codecs.nim +++ b/waku/waku_core/codecs.nim @@ -5,6 +5,8 @@ const WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1" WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1" WakuSyncCodec* = "/vac/waku/sync/1.0.0" + WakuReconciliationCodec* = "/vac/waku/reconciliation/1.0.0" + WakuTransferCodec* = "/vac/waku/transfer/1.0.0" WakuMetadataCodec* = "/vac/waku/metadata/1.0.0" WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1" WakuLegacyStoreCodec* = "/vac/waku/store/2.0.0-beta4" diff --git a/waku/waku_core/message/digest.nim b/waku/waku_core/message/digest.nim index cb4f5b014..0aa1ce610 100644 --- a/waku/waku_core/message/digest.nim +++ b/waku/waku_core/message/digest.nim @@ -48,3 +48,11 @@ proc computeMessageHash*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessag ctx.update(toBytesBE(uint64(msg.timestamp))) return ctx.finish() # Computes the hash + +proc cmp*(x, y: WakuMessageHash): int = + if x < y: + return -1 + elif x == y: + return 0 + + return 1 diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim new file mode 100644 index 000000000..c02702bd1 --- /dev/null +++ b/waku/waku_store_sync/codec.nim @@ -0,0 +1,281 @@ +{.push raises: [].} + +import std/sequtils, stew/leb128 + +import ../common/protobuf, ../waku_core/message, ../waku_core/time, ./common + +const + HashLen = 32 + VarIntLen = 9 + AvgCapacity = 1000 + +proc encode*(value: WakuMessageAndTopic): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, value.pubsub) + pb.write3(2, value.message.encode()) + + return pb + +proc deltaEncode*(itemSet: ItemSet): seq[byte] = + # 1 byte for resolved bool and 32 bytes hash plus 9 bytes varint per elements + let capacity = 1 + (itemSet.elements.len * (VarIntLen + HashLen)) + + var + output = newSeqOfCap[byte](capacity) + lastTime = Timestamp(0) + buf = Leb128Buf[uint64]() + + for id in itemSet.elements: + let timeDiff = uint64(id.time) - uint64(lastTime) + lastTime = id.time + + # encode timestamp + buf = timediff.toBytes(Leb128) + output &= @buf + + output &= id.hash + + output &= byte(itemSet.reconciled) + + return output + +proc deltaEncode*(value: RangesData): seq[byte] = + if value.ranges.len == 0: + return @[0] + + var + output = newSeqOfCap[byte](AvgCapacity) + buf = Leb128Buf[uint64]() + lastTimestamp: Timestamp + lastHash: Fingerprint + i = 0 + j = 0 + + # the first range is implicit but must be explicit when encoded + let (bound, _) = value.ranges[0] + + lastTimestamp = bound.a.time + lastHash = bound.a.hash + + # encode first timestamp + buf = uint64(lastTimestamp).toBytes(Leb128) + output &= @buf + + # implicit first hash is always 0 and range type is always skip + + for (bound, rangeType) in value.ranges: + let timeDiff = uint64(bound.b.time) - uint64(lastTimestamp) + lastTimestamp = bound.b.time + + # encode timestamp + buf = timeDiff.toBytes(Leb128) + output &= @buf + + if timeDiff == 0: + var sameBytes = 0 + for (byte1, byte2) in zip(lastHash, bound.b.hash): + sameBytes.inc() + + if byte1 != byte2: + break + + # encode number of same bytes + output &= byte(sameBytes) + + # encode hash bytes + output &= bound.b.hash[0 ..< sameBytes] + + # encode rangeType + output &= byte(rangeType) + + case rangeType + of RangeType.Skip: + continue + of RangeType.Fingerprint: + output &= value.fingerprints[i] + i.inc() + of RangeType.ItemSet: + let itemSet = value.itemSets[j] + j.inc() + + # encode how many elements are in the set + buf = uint64(itemSet.elements.len).toBytes(Leb128) + output &= @buf + + let encodedSet = itemSet.deltaEncode() + + output &= encodedSet + + continue + + return output + +proc getItemSet(idx: var int, buffer: seq[byte], itemSetLength: int): ItemSet = + var itemSet = ItemSet() + let slice = buffer[idx ..< buffer.len] + let count = deltaDecode(itemSet, slice, itemSetLength) + idx += count + +proc getItemSetLength(idx: var int, buffer: seq[byte]): int = + let min = min(idx + VarIntLen, buffer.len) + let slice = buffer[idx ..< min] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + + return int(val) + +proc getFingerprint(idx: var int, buffer: seq[byte]): Result[Fingerprint, string] = + if idx + HashLen > buffer.len: + return err("Cannot decode fingerprint") + + let slice = buffer[idx ..< idx + HashLen] + idx += HashLen + var fingerprint = EmptyFingerprint + for i, bytes in slice: + fingerprint[i] = bytes + + return ok(fingerprint) + +proc getRangeType(idx: var int, buffer: seq[byte]): Result[RangeType, string] = + if idx >= buffer.len: + return err("Cannot decode range type") + + let rangeType = RangeType(buffer[idx]) + idx += 1 + + return ok(rangeType) + +proc updateHash(idx: var int, buffer: seq[byte], hash: var WakuMessageHash) = + if idx >= buffer.len: + return + + let sameBytes = int(buffer[idx]) + idx += 1 + + if idx + sameBytes > buffer.len: + return + + let slice = buffer[idx ..< idx + sameBytes] + idx += sameBytes + + for i, bytes in slice: + hash[i] = bytes + +proc getTimeDiff(idx: var int, buffer: seq[byte]): Timestamp = + let min = min(idx + VarIntLen, buffer.len) + let slice = buffer[idx ..< min] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + + return Timestamp(val) + +proc getTimestamp(idx: var int, buffer: seq[byte]): Result[Timestamp, string] = + if idx + VarIntLen > buffer.len: + return err("Cannot decode timestamp") + + let slice = buffer[idx ..< idx + VarIntLen] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + + return ok(Timestamp(val)) + +proc getHash(idx: var int, buffer: seq[byte]): Result[WakuMessageHash, string] = + if idx + HashLen > buffer.len: + return err("Cannot decode hash") + + let slice = buffer[idx ..< idx + HashLen] + idx += HashLen + var hash = EmptyWakuMessageHash + for i, bytes in slice: + hash[i] = bytes + + return ok(hash) + +proc getReconciled(idx: var int, buffer: seq[byte]): Result[bool, string] = + if idx >= buffer.len: + return err("Cannot decode reconciled") + + let recon = bool(buffer[idx]) + idx += 1 + + return ok(recon) + +proc deltaDecode*( + itemSet: var ItemSet, buffer: seq[byte], setLength: int +): Result[int, string] = + var + lastTime = Timestamp(0) + idx = 0 + + while itemSet.elements.len < setLength: + let timeDiff = ?getTimestamp(idx, buffer) + let time = lastTime + timeDiff + lastTime = time + + let hash = ?getHash(idx, buffer) + + let id = SyncID(time: time, hash: hash) + + itemSet.elements.add(id) + + itemSet.reconciled = ?getReconciled(idx, buffer) + + return idx + +proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] = + if buffer.len == 1: + return RangesData() + + var + payload = RangesData() + lastTime = Timestamp(0) + idx = 0 + + lastTime = ?getTimestamp(idx, buffer) + + # implicit first hash is always 0 + # implicit first range mode is alway skip + + while idx < buffer.len - 1: + let lowerRangeBound = SyncID(time: lastTime, hash: EmptyWakuMessageHash) + + let timeDiff = getTimeDiff(idx, buffer) + + var hash = EmptyWakuMessageHash + if timeDiff == 0: + updateHash(idx, buffer, hash) + + let thisTime = lastTime + timeDiff + lastTime = thisTime + + let upperRangeBound = SyncID(time: thisTime, hash: hash) + let bounds = lowerRangeBound .. upperRangeBound + + let rangeType = ?getRangeType(idx, buffer) + payload.ranges.add((bounds, rangeType)) + + if rangeType == RangeType.Fingerprint: + let fingerprint = ?getFingerprint(idx, buffer) + payload.fingerprints.add(fingerprint) + elif rangeType == RangeType.ItemSet: + let itemSetLength = getItemSetLength(idx, buffer) + let itemSet = getItemSet(idx, buffer, itemSetLength) + payload.itemSets.add(itemSet) + + return payload + +proc decode*(T: type WakuMessageAndTopic, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + + var pubsub: string + if not ?pb.getField(1, pubsub): + return err(ProtobufError.missingRequiredField("pubsub")) + + var proto: ProtoBuffer + if not ?pb.getField(2, proto): + return err(ProtobufError.missingRequiredField("msg")) + + let message = ?WakuMessage.decode(proto.buffer) + + return ok(WakuMessageAndTopic(pubsub: pubsub, message: message)) diff --git a/waku/waku_store_sync/common.nim b/waku/waku_store_sync/common.nim new file mode 100644 index 000000000..279545078 --- /dev/null +++ b/waku/waku_store_sync/common.nim @@ -0,0 +1,81 @@ +{.push raises: [].} + +import std/[options], chronos, stew/[byteutils] + +import ../waku_core + +const + DefaultSyncInterval*: Duration = 5.minutes + DefaultSyncRange*: Duration = 1.hours + DefaultGossipSubJitter*: Duration = 20.seconds + +type + Fingerprint* = array[32, byte] + + SyncID* = object + time*: Timestamp + hash*: WakuMessageHash + + ItemSet* = object + elements*: seq[SyncID] + reconciled*: bool + + RangeType* {.pure.} = enum + Skip = 0 + Fingerprint = 1 + ItemSet = 2 + + RangesData* = object + ranges*: seq[(Slice[SyncID], RangeType)] + fingerprints*: seq[Fingerprint] # Range type fingerprint stored here in order + itemSets*: seq[ItemSet] # Range type itemset stored here in order + + WakuMessageAndTopic* = object + pubsub*: PubSubTopic + message*: WakuMessage + +const EmptyFingerprint*: Fingerprint = [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, +] + +const FullFingerprint*: Fingerprint = [ + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, +] + +proc high*(T: type SyncID): T = + ## Same as high(int) but for IDs + + return SyncID(time: Timestamp(high(int64)), fingerprint: FullFingerprint) + +proc low*(T: type SyncID): T = + ## Same as low(int) but for IDs + + return SyncID(time: Timestamp(low(int64)), fingerprint: EmptyFingerprint) + +proc `$`*(value: SyncID): string = + return '(' & $value.time & ", " & $value.hash & ')' + +proc cmp(x, y: Fingerprint): int = + if x < y: + return -1 + elif x == y: + return 0 + + return 1 + +proc cmp*(x, y: SyncID): int = + if x.time == y.time: + return cmp(x.hash, y.hash) + + if x.time < y.time: + return -1 + + return 1 + +proc `<`*(x, y: SyncID): bool = + cmp(x, y) == -1 + +proc `>`*(x, y: SyncID): bool = + cmp(x, y) == 1