diff --git a/tests/waku_store_sync/sync_utils.nim b/tests/waku_store_sync/sync_utils.nim new file mode 100644 index 000000000..b949b89bf --- /dev/null +++ b/tests/waku_store_sync/sync_utils.nim @@ -0,0 +1,60 @@ +{.used.} + +import std/[options, random], chronos, chronicles + +import waku/[node/peer_manager, waku_core, waku_store_sync], ../testlib/wakucore + +randomize() + +proc randomHash*(rng: var Rand): WakuMessageHash = + var hash = EmptyWakuMessageHash + + for i in 0 ..< hash.len: + hash[i] = rng.rand(uint8) + + return hash + +proc newTestWakuRecon*( + switch: Switch, + idsRx: AsyncQueue[ID], + wantsTx: AsyncQueue[(PeerId, Fingerprint)], + needsTx: AsyncQueue[(PeerId, Fingerprint)], +): Future[SyncReconciliation] {.async.} = + let peerManager = PeerManager.new(switch) + + let res = await SyncReconciliation.new( + peerManager = peerManager, + wakuArchive = nil, + relayJitter = 0.seconds, + idsRx = idsRx, + wantsTx = wantsTx, + needsTx = needsTx, + ) + + let proto = res.get() + + proto.start() + switch.mount(proto) + + return proto + +proc newTestWakuTransfer*( + switch: Switch, + idsTx: AsyncQueue[ID], + wantsRx: AsyncQueue[(PeerId, Fingerprint)], + needsRx: AsyncQueue[(PeerId, Fingerprint)], +): SyncTransfer = + let peerManager = PeerManager.new(switch) + + let proto = SyncTransfer.new( + peerManager = peerManager, + wakuArchive = nil, + idsTx = idsTx, + wantsRx = wantsRx, + needsRx = needsRx, + ) + + proto.start() + switch.mount(proto) + + return proto diff --git a/tests/waku_store_sync/test_codec.nim b/tests/waku_store_sync/test_codec.nim new file mode 100644 index 000000000..132c67c67 --- /dev/null +++ b/tests/waku_store_sync/test_codec.nim @@ -0,0 +1,197 @@ +{.used.} + +import std/[options, random], testutils/unittests, chronos + +import + ../../waku/waku_core, + ../../waku/waku_core/message/digest, + ../../waku/waku_core/time, + ../../waku/waku_store_sync, + ../../waku/waku_store_sync/common, + ../../waku/waku_store_sync/codec, + ./sync_utils + +proc randomItemSet(count: int, startTime: Timestamp, rng: var Rand): ItemSet = + var + elements = newSeqOfCap[ID](count) + lastTime = startTime + + for i in 0 ..< count: + let diff = rng.rand(9.uint8) + 1 + + let timestamp = lastTime + diff * 1_000_000_000 + lastTime = timestamp + + let hash = randomHash(rng) + + let id = ID(time: Timestamp(timestamp), fingerprint: hash) + + elements.add(id) + + return ItemSet(elements: elements, reconciled: true) + +proc randomSetRange( + count: int, startTime: Timestamp, rng: var Rand +): (Slice[ID], ItemSet) = + let itemSet = randomItemSet(count, startTime, rng) + + var + lb = itemSet.elements[0] + ub = itemSet.elements[^1] + + #for test check equality + lb.fingerprint = EmptyFingerprint + ub.fingerprint = EmptyFingerprint + + let bounds = lb .. ub + + return (bounds, itemSet) + +suite "Waku Store Sync Codec": + test "empty item set encoding roundtrip": + var origItemSet = ItemSet() + + origItemSet.reconciled = true + + var encodedSet = origItemSet.deltaEncode() + + var itemSet = ItemSet() + let _ = deltaDecode(itemSet, encodedSet, 0) + + check: + origItemSet == itemSet + + test "item set encoding roundtrip": + let + count = 10 + time = getNowInNanosecondTime() + + var rng = initRand() + + let origItemSet = randomItemSet(count, time, rng) + var encodedSet = origItemSet.deltaEncode() + + #faking a longer payload + let pad: seq[byte] = + @[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + encodedSet &= pad + + var itemSet = ItemSet() + let _ = deltaDecode(itemSet, encodedSet, count) + + check: + origItemSet == itemSet + + test "payload item set encoding roundtrip": + let count = 5 + + var + rng = initRand() + time = getNowInNanosecondTime() + + let (bounds1, itemSet1) = randomSetRange(count, time, rng) + let (bounds2, itemSet2) = randomSetRange(count, time + 10_000_000_000, rng) + let (bounds3, itemSet3) = randomSetRange(count, time + 20_000_000_000, rng) + let (bounds4, itemSet4) = randomSetRange(count, time + 30_000_000_000, rng) + + let range1 = (bounds1, RangeType.itemSetRange) + let range2 = (bounds2, RangeType.itemSetRange) + let range3 = (bounds3, RangeType.itemSetRange) + let range4 = (bounds4, RangeType.itemSetRange) + + let payload = SyncPayload( + ranges: @[range1, range2, range3, range4], + fingerprints: @[], + itemSets: @[itemSet1, itemSet2, itemSet3, itemSet4], + ) + + let encodedPayload = payload.deltaEncode() + + let decodedPayload = SyncPayload.deltaDecode(encodedPayload) + + check: + payload.ranges[0][0].b == decodedPayload.ranges[0][0].b + payload.ranges[1][0].b == decodedPayload.ranges[1][0].b + payload.ranges[2][0].b == decodedPayload.ranges[2][0].b + payload.ranges[3][0].b == decodedPayload.ranges[3][0].b + payload.itemSets == decodedPayload.itemSets + + test "payload fingerprint encoding roundtrip": + let count = 4 + + var + rng = initRand() + lastTime = getNowInNanosecondTime() + ranges = newSeqOfCap[(Slice[ID], RangeType)](4) + + for i in 0 ..< count: + let lb = ID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint) + #echo "lower bound: " & $lastTime + let nowTime = lastTime + 10_000_000_000 # 10s + #echo "upper bound: " & $nowTime + lastTime = nowTime + let ub = ID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) + let bounds = lb .. ub + let range = (bounds, RangeType.fingerprintRange) + + ranges.add(range) + + let payload = SyncPayload( + ranges: ranges, + fingerprints: + @[randomHash(rng), randomHash(rng), randomHash(rng), randomHash(rng)], + itemSets: @[], + ) + + let encodedPayload = payload.deltaEncode() + #echo "encoding done!" + let decodedPayload = SyncPayload.deltaDecode(encodedPayload) + + check: + payload.ranges[0][0].b == decodedPayload.ranges[0][0].b + payload.ranges[1][0].b == decodedPayload.ranges[1][0].b + payload.ranges[2][0].b == decodedPayload.ranges[2][0].b + payload.ranges[3][0].b == decodedPayload.ranges[3][0].b + payload.fingerprints == decodedPayload.fingerprints + + test "payload mixed encoding roundtrip": + let count = 2 + + var + rng = initRand() + lastTime = getNowInNanosecondTime() + ranges = newSeqOfCap[(Slice[ID], RangeType)](4) + itemSets = newSeqOfCap[ItemSet](4) + fingerprints = newSeqOfCap[Fingerprint](4) + + for i in 1 .. count: + let lb = ID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint) + let nowTime = lastTime + 10_000_000_000 # 10s + lastTime = nowTime + let ub = ID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) + let bounds = lb .. ub + let range = (bounds, RangeType.fingerprintRange) + + ranges.add(range) + fingerprints.add(randomHash(rng)) + + let (bound, itemSet) = randomSetRange(5, lastTime, rng) + lastTime += 50_000_000_000 # 50s + + ranges.add((bound, RangeType.itemSetRange)) + itemSets.add(itemSet) + + let payload = + SyncPayload(ranges: ranges, fingerprints: fingerprints, itemSets: itemSets) + + let encodedPayload = payload.deltaEncode() + #echo "encoding done!" + let decodedPayload = SyncPayload.deltaDecode(encodedPayload) + + check: + payload.ranges[0][0].b == decodedPayload.ranges[0][0].b + payload.ranges[1][0].b == decodedPayload.ranges[1][0].b + payload.ranges[2][0].b == decodedPayload.ranges[2][0].b + payload.ranges[3][0].b == decodedPayload.ranges[3][0].b + payload.fingerprints == decodedPayload.fingerprints + payload.itemSets == decodedPayload.itemSets diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim new file mode 100644 index 000000000..d8594503c --- /dev/null +++ b/waku/waku_store_sync/codec.nim @@ -0,0 +1,283 @@ +{.push raises: [].} + +import std/sequtils, stew/leb128 + +import ../common/protobuf, ../waku_core/message, ../waku_core/time, ./common + +proc encode*(value: WakuMessagePayload): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, value.pubsub) + pb.write3(2, value.message.encode()) + + return pb + +proc deltaEncode*(itemSet: ItemSet): seq[byte] = + let capacity = 1 + (itemSet.elements.len * 41) + + var + output = newSeqOfCap[byte](capacity) + lastTime = Timestamp(0) + buf = Leb128Buf[uint64]() + + for id in itemSet.elements: + let timeDiff = uint64(id.time) - uint64(lastTime) + lastTime = id.time + + # encode timestamp + buf = timediff.toBytes(Leb128) + output &= @buf + + output &= id.fingerprint + + output &= byte(itemSet.reconciled) + + return output + +proc deltaEncode*(value: SyncPayload): seq[byte] = + if value.ranges.len == 0: + return @[0] + + var + output = newSeqOfCap[byte](1000) + buf = Leb128Buf[uint64]() + lastTimestamp: Timestamp + lastHash: Fingerprint + i = 0 + j = 0 + + # the first range is implicit but must be explicit when encoded + let (bound, _) = value.ranges[0] + + lastTimestamp = bound.a.time + lastHash = bound.a.fingerprint + + # encode first timestamp + buf = uint64(lastTimestamp).toBytes(Leb128) + output &= @buf + + #echo "First Timestamp: " & $lastTimestamp + + # implicit first fingerprint is always 0 and range type is always skip + + for (bound, rangeType) in value.ranges: + let timeDiff = uint64(bound.b.time) - uint64(lastTimestamp) + lastTimestamp = bound.b.time + + # encode timestamp + buf = timeDiff.toBytes(Leb128) + output &= @buf + + #echo "Timestamp: " & $timeDiff + + if timeDiff == 0: + var sameBytes = 0 + for (byte1, byte2) in zip(lastHash, bound.b.fingerprint): + sameBytes.inc() + + if byte1 != byte2: + break + + # encode number of same bytes + output &= byte(sameBytes) + + #echo "Same Bytes: " & $sameBytes + + # encode fingerprint bytes + output &= bound.b.fingerprint[0 ..< sameBytes] + + # encode rangeType + output &= byte(rangeType) + + case rangeType + of skipRange: + #echo "Skip Range" + continue + of fingerprintRange: + let fingerprint = value.fingerprints[i] + i.inc() + + #echo "Encode Fingerprint" + + output &= fingerprint + of itemSetRange: + let itemSet = value.itemSets[j] + j.inc() + + # encode how many elements are in the set + buf = uint64(itemSet.elements.len).toBytes(Leb128) + output &= @buf + + #echo "Set elements: " & $itemSet.elements.len + + let encodedSet = itemSet.deltaEncode() + + #echo "Encoded Set" + + output &= encodedSet + + continue + + return output + +proc deltaDecode*(itemSet: var ItemSet, buffer: seq[byte], setLength: int): int = + var + lastTime = Timestamp(0) + val = 0.uint64 + len = 0.int8 + idx = 0 + + while itemSet.elements.len < setLength: + var slice = buffer[idx ..< idx + 9] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + + let time = lastTime + Timestamp(val) + lastTime = time + + #echo "Timestamp: " & $time + #echo "IDX: " & $idx + + slice = buffer[idx ..< idx + 32] + idx += 32 + var fingerprint = EmptyFingerprint + for i, bytes in slice: + fingerprint[i] = bytes + + #echo "Hash: " & $fingerprint + #echo "IDX: " & $idx + + let id = ID(time: time, fingerprint: fingerprint) + + itemSet.elements.add(id) + + itemSet.reconciled = bool(buffer[idx]) + idx += 1 + + #echo "Reconciled: " & $itemSet.reconciled + #echo "IDX: " & $idx + + return idx + +proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = + #echo "buffer length: " & $buffer.len + + if buffer.len == 1: + return SyncPayload() + + var + payload = SyncPayload() + lastTime = Timestamp(0) + val = 0.uint64 + len = 0.int8 + idx = 0 + slice = buffer[idx ..< idx + 9] + + # first timestamp + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + lastTime = Timestamp(val) + + #echo "First Range Timestamp: " & $lastTime + #echo "IDX: " & $idx + + # implicit first fingerprint is always 0 + # implicit first range mode is alway skip + + while idx < buffer.len - 1: + let lowerRangeBound = ID(time: lastTime, fingerprint: EmptyFingerprint) + + # decode timestamp diff + let min = min(idx + 9, buffer.len) + slice = buffer[idx ..< min] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + let timeDiff = Timestamp(val) + + #echo "Range Timestamp diff: " & $timeDiff + #echo "IDX: " & $idx + + var fingerprint = EmptyFingerprint + if timeDiff == 0: + # decode number of same bytes + let sameBytes = int(buffer[idx]) + idx += 1 + + #echo "Same Bytes Count: " & $sameBytes + #echo "IDX: " & $idx + + # decode same bytes + slice = buffer[idx ..< idx + sameBytes] + idx += sameBytes + for i, bytes in slice: + fingerprint[i] = bytes + + #echo "Same Bytes: " & $fingerprint + #echo "IDX: " & $idx + + let thisTime = lastTime + timeDiff + lastTime = thisTime + + #echo "Range Timestamp: " & $thisTime + + let upperRangeBound = ID(time: thisTime, fingerprint: fingerprint) + + let bounds = lowerRangeBound .. upperRangeBound + + # decode range type + let rangeType = RangeType(buffer[idx]) + idx += 1 + + #echo "Range Type: " & $rangeType + #echo "IDX: " & $idx + + payload.ranges.add((bounds, rangeType)) + + if rangeType == fingerprintRange: + # decode fingerprint + slice = buffer[idx ..< idx + 32] + idx += 32 + var fingerprint = EmptyFingerprint + for i, bytes in slice: + fingerprint[i] = bytes + + #echo "Fingerprint: " & $fingerprint + #echo "IDX: " & $idx + + payload.fingerprints.add(fingerprint) + elif rangeType == itemSetRange: + # decode item set length + let min = min(idx + 9, buffer.len) + slice = buffer[idx ..< min] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + let itemSetLength = int(val) + + #echo "Set length: " & $itemSetLength + #echo "IDX: " & $idx + + # decode item set + var itemSet = ItemSet() + slice = buffer[idx ..< buffer.len] + let count = deltaDecode(itemSet, slice, itemSetLength) + idx += count + + #echo "Set Decoded" + #echo "IDX: " & $idx + + payload.itemSets.add(itemSet) + + return payload + +proc decode*(T: type WakuMessagePayload, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + + var pubsub: string + discard ?pb.getField(1, pubsub) + + var proto: ProtoBuffer + discard ?pb.getField(2, proto) + + let message = ?WakuMessage.decode(proto.buffer) + + return ok(WakuMessagePayload(pubsub: pubsub, message: message)) diff --git a/waku/waku_store_sync/common.nim b/waku/waku_store_sync/common.nim new file mode 100644 index 000000000..fd0bc8faa --- /dev/null +++ b/waku/waku_store_sync/common.nim @@ -0,0 +1,80 @@ +{.push raises: [].} + +import std/[options], chronos, stew/[byteutils] + +import ../waku_core + +const + DefaultSyncInterval*: Duration = 5.minutes + DefaultSyncRange*: Duration = 1.hours + RetryDelay*: Duration = 30.seconds + SyncReconciliationCodec* = "/vac/waku/reconciliation/1.0" + SyncTransferCodec* = "/vac/waku/transfer/1.0" + DefaultGossipSubJitter*: Duration = 20.seconds + +type + Fingerprint* = array[32, byte] + + ID* = object + time*: Timestamp + fingerprint*: Fingerprint + + ItemSet* = object + elements*: seq[ID] + reconciled*: bool + + RangeType* = enum + skipRange = 0 + fingerprintRange = 1 + itemSetRange = 2 + + SyncPayload* = object + ranges*: seq[(Slice[ID], RangeType)] + fingerprints*: seq[Fingerprint] + itemSets*: seq[ItemSet] + + WakuMessagePayload* = object + pubsub*: string + message*: WakuMessage + +const EmptyFingerprint*: Fingerprint = [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, +] + +const FullFingerprint*: Fingerprint = [ + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, +] + +proc high*(T: type ID): T = + return ID(time: Timestamp(high(int64)), fingerprint: FullFingerprint) + +proc low*(T: type ID): T = + return ID(time: Timestamp(low(int64)), fingerprint: EmptyFingerprint) + +proc `$`*(value: ID): string = + return '(' & $value.time & ", " & $value.fingerprint & ')' + +proc cmp(x, y: Fingerprint): int = + if x < y: + return -1 + elif x == y: + return 0 + + return 1 + +proc cmp*(x, y: ID): int = + if x.time == y.time: + return cmp(x.fingerprint, y.fingerprint) + + if x.time < y.time: + return -1 + + return 1 + +proc `<`*(x, y: ID): bool = + cmp(x, y) == -1 + +proc `>`*(x, y: ID): bool = + cmp(x, y) == 1