mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-07-02 13:59:41 +00:00
feat: replace nim-libp2p protobuf with nim-protobuf-serialization
libp2p was pulled in solely for its `minprotobuf` field codec, dragging in its full transitive tree (secp256k1, nimcrypto, websock, etc.). Swap it for nim-protobuf-serialization, whose low-level `codec` module gives the same protobuf wire primitives without the networking stack. The wire/snapshot codecs build messages by hand at the field level and rely on a backward-compatible decode path the type-driven `Protobuf.encode/decode` API cannot express. To keep that code intact, `protobufutil.nim` is a thin `ProtoBuffer` shim over the new `codec` module, preserving the exact field-level behaviour the codecs depend on (plain varints, length-delimited bytes/strings with no UTF-8 validation, unknown-wire-type fields skipped as protoc does). Dependency closure (nimble.lock, nix/deps.nix) regenerated accordingly: libp2p and its 8 exclusive transitive deps dropped, protobuf_serialization and its npeg dependency added. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
c4d41aa5a5
commit
e2ec4d55b3
158
nimble.lock
158
nimble.lock
@ -23,18 +23,6 @@
|
||||
"sha1": "7e068f119664cf47ad0cfb74ef4c56fb6b616523"
|
||||
}
|
||||
},
|
||||
"bearssl_pkey_decoder": {
|
||||
"version": "0.1.0",
|
||||
"vcsRevision": "21dd3710df9345ed2ad8bf8f882761e07863b8e0",
|
||||
"url": "https://github.com/vacp2p/bearssl_pkey_decoder",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [
|
||||
"bearssl"
|
||||
],
|
||||
"checksums": {
|
||||
"sha1": "21b42e2e6ddca6c875d3fc50f36a5115abf51714"
|
||||
}
|
||||
},
|
||||
"results": {
|
||||
"version": "0.5.1",
|
||||
"vcsRevision": "df8113dda4c2d74d460a8fa98252b0b771bf1f27",
|
||||
@ -85,6 +73,32 @@
|
||||
"sha1": "fa35c1bb76a0a02a2379fe86eaae0957c7527cb8"
|
||||
}
|
||||
},
|
||||
"npeg": {
|
||||
"version": "1.3.0",
|
||||
"vcsRevision": "409f6796d0e880b3f0222c964d1da7de6e450811",
|
||||
"url": "https://github.com/zevv/npeg",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [],
|
||||
"checksums": {
|
||||
"sha1": "64f15c85a059c889cb11c5fe72372677c50da621"
|
||||
}
|
||||
},
|
||||
"protobuf_serialization": {
|
||||
"version": "0.4.0",
|
||||
"vcsRevision": "38d24eb3bd93e605fb88199da71d36b1ec0ad60d",
|
||||
"url": "https://github.com/status-im/nim-protobuf-serialization",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [
|
||||
"stew",
|
||||
"faststreams",
|
||||
"serialization",
|
||||
"npeg",
|
||||
"unittest2"
|
||||
],
|
||||
"checksums": {
|
||||
"sha1": "5a7a80fb8cca29e41899ce9540b74e49c874f8fd"
|
||||
}
|
||||
},
|
||||
"json_serialization": {
|
||||
"version": "0.4.4",
|
||||
"vcsRevision": "c343b0e243d9e17e2c40f3a8a24340f7c4a71d44",
|
||||
@ -157,39 +171,6 @@
|
||||
"sha1": "455802a90204d8ad6b31d53f2efff8ebfe4c834a"
|
||||
}
|
||||
},
|
||||
"dnsclient": {
|
||||
"version": "0.3.4",
|
||||
"vcsRevision": "23214235d4784d24aceed99bbfe153379ea557c8",
|
||||
"url": "https://github.com/ba0f3/dnsclient.nim",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [],
|
||||
"checksums": {
|
||||
"sha1": "65262c7e533ff49d6aca5539da4bc6c6ce132f40"
|
||||
}
|
||||
},
|
||||
"jwt": {
|
||||
"version": "0.2",
|
||||
"vcsRevision": "18f8378de52b241f321c1f9ea905456e89b95c6f",
|
||||
"url": "https://github.com/vacp2p/nim-jwt.git",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [
|
||||
"bearssl",
|
||||
"bearssl_pkey_decoder"
|
||||
],
|
||||
"checksums": {
|
||||
"sha1": "bcfd6fc9c5e10a52b87117219b7ab5c98136bc8e"
|
||||
}
|
||||
},
|
||||
"nimcrypto": {
|
||||
"version": "0.7.3",
|
||||
"vcsRevision": "b3dbc9c4d08e58c5b7bfad6dc7ef2ee52f2f4c08",
|
||||
"url": "https://github.com/cheatfate/nimcrypto",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [],
|
||||
"checksums": {
|
||||
"sha1": "f72b90fe3f4da09efa482de4f8729e7ee4abea2f"
|
||||
}
|
||||
},
|
||||
"metrics": {
|
||||
"version": "0.1.2",
|
||||
"vcsRevision": "11d0cddfb0e711aa2a8c75d1892ae24a64c299fc",
|
||||
@ -204,93 +185,6 @@
|
||||
"sha1": "5cdac99d85d3c146d170e85064c88fb28f377842"
|
||||
}
|
||||
},
|
||||
"secp256k1": {
|
||||
"version": "0.6.0.3.2",
|
||||
"vcsRevision": "d8f1288b7c72f00be5fc2c5ea72bf5cae1eafb15",
|
||||
"url": "https://github.com/status-im/nim-secp256k1",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [
|
||||
"stew",
|
||||
"results",
|
||||
"nimcrypto"
|
||||
],
|
||||
"checksums": {
|
||||
"sha1": "6618ef9de17121846a8c1d0317026b0ce8584e10"
|
||||
}
|
||||
},
|
||||
"zlib": {
|
||||
"version": "0.1.0",
|
||||
"vcsRevision": "e680f269fb01af2c34a2ba879ff281795a5258fe",
|
||||
"url": "https://github.com/status-im/nim-zlib",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [
|
||||
"stew",
|
||||
"results"
|
||||
],
|
||||
"checksums": {
|
||||
"sha1": "bbde4f5a97a84b450fef7d107461e5f35cf2b47f"
|
||||
}
|
||||
},
|
||||
"websock": {
|
||||
"version": "0.2.1",
|
||||
"vcsRevision": "35ae76f1559e835c80f9c1a3943bf995d3dd9eb5",
|
||||
"url": "https://github.com/status-im/nim-websock",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [
|
||||
"chronos",
|
||||
"httputils",
|
||||
"chronicles",
|
||||
"stew",
|
||||
"nimcrypto",
|
||||
"bearssl",
|
||||
"results",
|
||||
"zlib"
|
||||
],
|
||||
"checksums": {
|
||||
"sha1": "1cb5efa10cd389bc01d0707c242ae010c76a03cd"
|
||||
}
|
||||
},
|
||||
"lsquic": {
|
||||
"version": "0.0.1",
|
||||
"vcsRevision": "4fb03ee7bfb39aecb3316889fdcb60bec3d0936f",
|
||||
"url": "https://github.com/vacp2p/nim-lsquic",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [
|
||||
"zlib",
|
||||
"stew",
|
||||
"chronos",
|
||||
"nimcrypto",
|
||||
"unittest2",
|
||||
"chronicles"
|
||||
],
|
||||
"checksums": {
|
||||
"sha1": "f465fa994346490d0924d162f53d9b5aec62f948"
|
||||
}
|
||||
},
|
||||
"libp2p": {
|
||||
"version": "1.15.2",
|
||||
"vcsRevision": "ca48c3718246bb411ff0e354a70cb82d9a28de0d",
|
||||
"url": "https://github.com/vacp2p/nim-libp2p",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [
|
||||
"nimcrypto",
|
||||
"dnsclient",
|
||||
"bearssl",
|
||||
"chronicles",
|
||||
"chronos",
|
||||
"metrics",
|
||||
"secp256k1",
|
||||
"stew",
|
||||
"websock",
|
||||
"unittest2",
|
||||
"results",
|
||||
"lsquic",
|
||||
"jwt"
|
||||
],
|
||||
"checksums": {
|
||||
"sha1": "3b2cdc7e00261eb4210ca3d44ec3bd64c2b7bbba"
|
||||
}
|
||||
},
|
||||
"stint": {
|
||||
"version": "0.8.2",
|
||||
"vcsRevision": "470b7892561b5179ab20bd389a69217d6213fe58",
|
||||
|
||||
81
nix/deps.nix
81
nix/deps.nix
@ -17,13 +17,6 @@
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
bearssl_pkey_decoder = pkgs.fetchgit {
|
||||
url = "https://github.com/vacp2p/bearssl_pkey_decoder";
|
||||
rev = "21dd3710df9345ed2ad8bf8f882761e07863b8e0";
|
||||
sha256 = "0bl3f147zmkazbhdkr4cj1nipf9rqiw3g4hh1j424k9hpl55zdpg";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
results = pkgs.fetchgit {
|
||||
url = "https://github.com/arnetheduck/nim-results";
|
||||
rev = "df8113dda4c2d74d460a8fa98252b0b771bf1f27";
|
||||
@ -52,6 +45,20 @@
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
npeg = pkgs.fetchgit {
|
||||
url = "https://github.com/zevv/npeg";
|
||||
rev = "409f6796d0e880b3f0222c964d1da7de6e450811";
|
||||
sha256 = "1h2f5znbpa3svk7wsw2axn8f7f59d23xq85z148kiv6fqh0ffwbm";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
protobuf_serialization = pkgs.fetchgit {
|
||||
url = "https://github.com/status-im/nim-protobuf-serialization";
|
||||
rev = "38d24eb3bd93e605fb88199da71d36b1ec0ad60d";
|
||||
sha256 = "0jr0a41b4r444si6xfa7bclw8mjsk6id10lrdvbxzp99750zspb9";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
json_serialization = pkgs.fetchgit {
|
||||
url = "https://github.com/status-im/nim-json-serialization";
|
||||
rev = "c343b0e243d9e17e2c40f3a8a24340f7c4a71d44";
|
||||
@ -87,27 +94,6 @@
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
dnsclient = pkgs.fetchgit {
|
||||
url = "https://github.com/ba0f3/dnsclient.nim";
|
||||
rev = "23214235d4784d24aceed99bbfe153379ea557c8";
|
||||
sha256 = "03mf3lw5c0m5nq9ppa49nylrl8ibkv2zzlc0wyhqg7w09kz6hks6";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
jwt = pkgs.fetchgit {
|
||||
url = "https://github.com/vacp2p/nim-jwt.git";
|
||||
rev = "18f8378de52b241f321c1f9ea905456e89b95c6f";
|
||||
sha256 = "1986czmszdxj6g9yr7xn1fx8y2y9mwpb3f1bn9nc6973qawsdm0p";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
nimcrypto = pkgs.fetchgit {
|
||||
url = "https://github.com/cheatfate/nimcrypto";
|
||||
rev = "b3dbc9c4d08e58c5b7bfad6dc7ef2ee52f2f4c08";
|
||||
sha256 = "1v4rz42lwcazs6isi3kmjylkisr84mh0kgmlqycx4i885dn3g0l4";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
metrics = pkgs.fetchgit {
|
||||
url = "https://github.com/status-im/nim-metrics";
|
||||
rev = "11d0cddfb0e711aa2a8c75d1892ae24a64c299fc";
|
||||
@ -115,41 +101,6 @@
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
secp256k1 = pkgs.fetchgit {
|
||||
url = "https://github.com/status-im/nim-secp256k1";
|
||||
rev = "d8f1288b7c72f00be5fc2c5ea72bf5cae1eafb15";
|
||||
sha256 = "1qjrmwbngb73f6r1fznvig53nyal7wj41d1cmqfksrmivk2sgrn2";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
zlib = pkgs.fetchgit {
|
||||
url = "https://github.com/status-im/nim-zlib";
|
||||
rev = "e680f269fb01af2c34a2ba879ff281795a5258fe";
|
||||
sha256 = "1xw9f1gjsgqihdg7kdkbaq1wankgnx2vn9l3ihc6nqk2jzv5bvk5";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
websock = pkgs.fetchgit {
|
||||
url = "https://github.com/status-im/nim-websock";
|
||||
rev = "35ae76f1559e835c80f9c1a3943bf995d3dd9eb5";
|
||||
sha256 = "1j6dklzb6b6bv2aiglbiyflja2vdpmyxfirv98f49y62mykq0yrw";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
lsquic = pkgs.fetchgit {
|
||||
url = "https://github.com/vacp2p/nim-lsquic";
|
||||
rev = "4fb03ee7bfb39aecb3316889fdcb60bec3d0936f";
|
||||
sha256 = "0qdhcd4hyp185szc9sv3jvwdwc9zp3j0syy7glxv13k9bchfmkfg";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
libp2p = pkgs.fetchgit {
|
||||
url = "https://github.com/vacp2p/nim-libp2p";
|
||||
rev = "ca48c3718246bb411ff0e354a70cb82d9a28de0d";
|
||||
sha256 = "07qfjjrq6w7bj9dbchvcrpla47jidngbrgmigbhl7fh3cfkdabc9";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
stint = pkgs.fetchgit {
|
||||
url = "https://github.com/status-im/nim-stint";
|
||||
rev = "470b7892561b5179ab20bd389a69217d6213fe58";
|
||||
@ -166,8 +117,8 @@
|
||||
|
||||
ffi = pkgs.fetchgit {
|
||||
url = "https://github.com/logos-messaging/nim-ffi";
|
||||
rev = "fb25f069d2dfae2b543d79d2c1a81f197de22a2b";
|
||||
sha256 = "0zkjnrm2yjlw27q99kv2x8ll61mbz4nr0cvmyq0csydh43c08k0p";
|
||||
rev = "d4c87c1f94c4678eea7d32a8f5f41c72420fadb6";
|
||||
sha256 = "14dm92l3wl8sc5a108612r1cgjvxksy2chzmn1asph6frl4lm641";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
|
||||
@ -10,7 +10,7 @@ srcDir = "sds"
|
||||
# Dependencies
|
||||
requires "nim >= 2.2.4"
|
||||
requires "chronos >= 4.0.4"
|
||||
requires "libp2p >= 1.15.2"
|
||||
requires "protobuf_serialization >= 0.4.0"
|
||||
requires "chronicles"
|
||||
requires "stew"
|
||||
requires "stint"
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
import libp2p/protobuf/minprotobuf
|
||||
import endians
|
||||
import ./types/[sds_message_id, history_entry, sds_message, reliability_error]
|
||||
import ./protobufutil
|
||||
@ -6,7 +5,7 @@ import ./bloom
|
||||
import ./sds_utils
|
||||
|
||||
proc encodeHistoryEntry*(entry: HistoryEntry): ProtoBuffer =
|
||||
var entryPb = initProtoBuffer()
|
||||
var entryPb = ProtoBuffer.init()
|
||||
entryPb.write(1, entry.messageId)
|
||||
if entry.retrievalHint.len > 0:
|
||||
entryPb.write(2, entry.retrievalHint)
|
||||
@ -26,7 +25,7 @@ proc decodeHistoryEntry*(entryPb: ProtoBuffer): ProtobufResult[HistoryEntry] =
|
||||
ok(entry)
|
||||
|
||||
proc encode*(msg: SdsMessage): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
var pb = ProtoBuffer.init()
|
||||
|
||||
pb.write(1, msg.messageId)
|
||||
pb.write(2, uint64(msg.lamportTimestamp))
|
||||
@ -51,7 +50,7 @@ proc encode*(msg: SdsMessage): ProtoBuffer =
|
||||
return pb
|
||||
|
||||
proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
let pb = ProtoBuffer.init(buffer)
|
||||
var msg = SdsMessage.init("", 0, @[], "", @[], @[])
|
||||
|
||||
if not ?pb.getField(1, msg.messageId):
|
||||
@ -67,7 +66,7 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
|
||||
if pb.getRepeatedField(3, historyBuffers).isOk():
|
||||
# New format: repeated HistoryEntry
|
||||
for histBuffer in historyBuffers:
|
||||
let entryPb = initProtoBuffer(histBuffer)
|
||||
let entryPb = ProtoBuffer.init(histBuffer)
|
||||
let entry = ?decodeHistoryEntry(entryPb)
|
||||
msg.causalHistory.add(entry)
|
||||
else:
|
||||
@ -95,7 +94,7 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
|
||||
var repairBuffers: seq[seq[byte]]
|
||||
if pb.getRepeatedField(13, repairBuffers).isOk():
|
||||
for repairBuffer in repairBuffers:
|
||||
let entryPb = initProtoBuffer(repairBuffer)
|
||||
let entryPb = ProtoBuffer.init(repairBuffer)
|
||||
let entry = ?decodeHistoryEntry(entryPb)
|
||||
msg.repairRequest.add(entry)
|
||||
|
||||
@ -104,7 +103,7 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
|
||||
proc extractChannelId*(data: seq[byte]): Result[SdsChannelID, ReliabilityError] =
|
||||
## For extraction of channel ID without full message deserialization
|
||||
try:
|
||||
let pb = initProtoBuffer(data)
|
||||
let pb = ProtoBuffer.init(data)
|
||||
var channelId: SdsChannelID
|
||||
let fieldOk = pb.getField(4, channelId).valueOr:
|
||||
return err(ReliabilityError.reDeserializationError)
|
||||
@ -124,7 +123,7 @@ proc deserializeMessage*(data: seq[byte]): Result[SdsMessage, ReliabilityError]
|
||||
return ok(msg)
|
||||
|
||||
proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityError] =
|
||||
var pb = initProtoBuffer()
|
||||
var pb = ProtoBuffer.init()
|
||||
|
||||
try:
|
||||
var bytes = newSeq[byte](filter.intArray.len * sizeof(int))
|
||||
@ -149,7 +148,7 @@ proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityEr
|
||||
if data.len == 0:
|
||||
return err(ReliabilityError.reDeserializationError)
|
||||
|
||||
let pb = initProtoBuffer(data)
|
||||
let pb = ProtoBuffer.init(data)
|
||||
var bytes: seq[byte]
|
||||
var cap, errRate, kHashes, mBits: uint64
|
||||
|
||||
|
||||
@ -1,19 +1,175 @@
|
||||
# adapted from https://github.com/waku-org/nwaku/blob/master/waku/common/protobuf.nim
|
||||
# Minimal hand-rolled protobuf field codec, a thin shim over
|
||||
# `nim-protobuf-serialization`'s low-level wire `codec` module.
|
||||
#
|
||||
# `sds/protobuf.nim` and `sds/snapshot_codec.nim` build messages by hand at the
|
||||
# field level — including a backward-compatible decode path the type-driven
|
||||
# `Protobuf.encode/decode` API cannot express — so this exposes a small
|
||||
# accumulating `ProtoBuffer` with `write`/`getField`/`getRepeatedField`/`finish`:
|
||||
# * unsigned integers encode as plain varints (last value wins on decode);
|
||||
# * strings and byte seqs encode length-delimited, with no UTF-8 validation
|
||||
# (strings are treated as opaque bytes — message ids may be binary);
|
||||
# * a field whose stored wire type differs from the requested one is skipped,
|
||||
# as `protoc` does; only a malformed buffer yields an error.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import libp2p/protobuf/minprotobuf
|
||||
import libp2p/varint
|
||||
import results
|
||||
import faststreams/inputs
|
||||
from protobuf_serialization/codec import
|
||||
FieldHeader, WireKind, init, number, kind, toBytes, readHeader, readValue,
|
||||
puint64, pbytes, fixed64, fixed32
|
||||
import ./types/protobuf_error
|
||||
|
||||
export minprotobuf, varint, protobuf_error
|
||||
export results, protobuf_error
|
||||
|
||||
converter toProtobufError*(err: minprotobuf.ProtoError): ProtobufError =
|
||||
type ProtoBuffer* = object ## Accumulating protobuf field buffer.
|
||||
buffer*: seq[byte]
|
||||
|
||||
converter toProtobufError*(err: ProtoError): ProtobufError =
|
||||
case err
|
||||
of minprotobuf.ProtoError.RequiredFieldMissing:
|
||||
of ProtoError.RequiredFieldMissing:
|
||||
return ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: "unknown")
|
||||
else:
|
||||
return ProtobufError(kind: ProtobufErrorKind.DecodeFailure, error: err)
|
||||
|
||||
proc missingRequiredField*(T: type ProtobufError, field: string): T =
|
||||
return ProtobufError.init(field)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Construction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc init*(T: type ProtoBuffer): T =
|
||||
return T(buffer: @[])
|
||||
|
||||
proc init*(T: type ProtoBuffer, data: seq[byte]): T =
|
||||
return T(buffer: data)
|
||||
|
||||
proc finish*(pb: var ProtoBuffer) =
|
||||
## No length prefix is used, so finishing only asserts the invariant that a
|
||||
## top-level buffer is never empty.
|
||||
doAssert(pb.buffer.len > 0)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Writing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc writeVarint(pb: var ProtoBuffer, field: int, value: uint64) =
|
||||
pb.buffer.add(toBytes(FieldHeader.init(field, WireKind.Varint)))
|
||||
pb.buffer.add(toBytes(puint64(value)))
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, value: uint64) =
|
||||
pb.writeVarint(field, value)
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, value: uint32) =
|
||||
pb.writeVarint(field, uint64(value))
|
||||
|
||||
proc writeLengthDelim(pb: var ProtoBuffer, field: int, data: openArray[byte]) =
|
||||
pb.buffer.add(toBytes(FieldHeader.init(field, WireKind.LengthDelim)))
|
||||
pb.buffer.add(toBytes(puint64(uint64(data.len))))
|
||||
if data.len > 0:
|
||||
pb.buffer.add(data)
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, value: openArray[byte]) =
|
||||
pb.writeLengthDelim(field, value)
|
||||
|
||||
proc write*(pb: var ProtoBuffer, field: int, value: string) =
|
||||
pb.writeLengthDelim(field, value.toOpenArrayByte(0, value.high))
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Reading
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc bytesToString(b: seq[byte]): string =
|
||||
## Copy raw bytes into a string without UTF-8 validation — protobuf strings
|
||||
## are opaque bytes here, and message ids may not be valid UTF-8.
|
||||
var s = newString(b.len)
|
||||
if b.len > 0:
|
||||
copyMem(addr s[0], unsafeAddr b[0], b.len)
|
||||
return s
|
||||
|
||||
proc collectVarints(buffer: seq[byte], field: int): ProtoResult[seq[uint64]] =
|
||||
## All varint values stored at `field`, in order. Mismatched wire types at
|
||||
## the same field number are skipped, as protoc does.
|
||||
var values: seq[uint64]
|
||||
var sh = memoryInput(buffer)
|
||||
try:
|
||||
let stream = sh.s
|
||||
while stream.readable:
|
||||
let hdr = readHeader(stream)
|
||||
if hdr.number == field and hdr.kind == WireKind.Varint:
|
||||
values.add(uint64(readValue(stream, puint64)))
|
||||
else:
|
||||
case hdr.kind
|
||||
of WireKind.Varint: discard readValue(stream, puint64)
|
||||
of WireKind.Fixed64: discard readValue(stream, fixed64)
|
||||
of WireKind.Fixed32: discard readValue(stream, fixed32)
|
||||
of WireKind.LengthDelim: discard readValue(stream, pbytes)
|
||||
except CatchableError:
|
||||
return err(ProtoError.VarintDecode)
|
||||
return ok(values)
|
||||
|
||||
proc collectLengthDelims(buffer: seq[byte], field: int): ProtoResult[seq[seq[byte]]] =
|
||||
## All length-delimited values stored at `field`, in order.
|
||||
var values: seq[seq[byte]]
|
||||
var sh = memoryInput(buffer)
|
||||
try:
|
||||
let stream = sh.s
|
||||
while stream.readable:
|
||||
let hdr = readHeader(stream)
|
||||
if hdr.number == field and hdr.kind == WireKind.LengthDelim:
|
||||
values.add(seq[byte](readValue(stream, pbytes)))
|
||||
else:
|
||||
case hdr.kind
|
||||
of WireKind.Varint: discard readValue(stream, puint64)
|
||||
of WireKind.Fixed64: discard readValue(stream, fixed64)
|
||||
of WireKind.Fixed32: discard readValue(stream, fixed32)
|
||||
of WireKind.LengthDelim: discard readValue(stream, pbytes)
|
||||
except CatchableError:
|
||||
return err(ProtoError.VarintDecode)
|
||||
return ok(values)
|
||||
|
||||
proc getField*(pb: ProtoBuffer, field: int, output: var uint64): ProtoResult[bool] =
|
||||
let values = ?collectVarints(pb.buffer, field)
|
||||
if values.len > 0:
|
||||
output = values[^1]
|
||||
return ok(true)
|
||||
return ok(false)
|
||||
|
||||
proc getField*(pb: ProtoBuffer, field: int, output: var uint32): ProtoResult[bool] =
|
||||
let values = ?collectVarints(pb.buffer, field)
|
||||
if values.len > 0:
|
||||
output = uint32(values[^1])
|
||||
return ok(true)
|
||||
return ok(false)
|
||||
|
||||
proc getField*(pb: ProtoBuffer, field: int, output: var seq[byte]): ProtoResult[bool] =
|
||||
let values = ?collectLengthDelims(pb.buffer, field)
|
||||
if values.len > 0:
|
||||
output = values[^1]
|
||||
return ok(true)
|
||||
return ok(false)
|
||||
|
||||
proc getField*(pb: ProtoBuffer, field: int, output: var string): ProtoResult[bool] =
|
||||
let values = ?collectLengthDelims(pb.buffer, field)
|
||||
if values.len > 0:
|
||||
output = bytesToString(values[^1])
|
||||
return ok(true)
|
||||
return ok(false)
|
||||
|
||||
proc getRepeatedField*(
|
||||
pb: ProtoBuffer, field: int, output: var seq[seq[byte]]
|
||||
): ProtoResult[bool] =
|
||||
output = ?collectLengthDelims(pb.buffer, field)
|
||||
return ok(output.len > 0)
|
||||
|
||||
proc getRepeatedField*(
|
||||
pb: ProtoBuffer, field: int, output: var seq[string]
|
||||
): ProtoResult[bool] =
|
||||
let values = ?collectLengthDelims(pb.buffer, field)
|
||||
output.setLen(0)
|
||||
for v in values:
|
||||
output.add(bytesToString(v))
|
||||
return ok(output.len > 0)
|
||||
|
||||
{.pop.}
|
||||
|
||||
@ -13,7 +13,6 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[sets, times]
|
||||
import libp2p/protobuf/minprotobuf
|
||||
import ./types/[
|
||||
channel_meta, history_update, sds_message, sds_message_id, history_entry,
|
||||
unacknowledged_message, incoming_message, repair_entry, reliability_error,
|
||||
@ -44,7 +43,7 @@ proc fromUnixMs(ms: int64): Time =
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc encodeUnacked(u: UnacknowledgedMessage): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
var pb = ProtoBuffer.init()
|
||||
let msgPb = wire.encode(u.message)
|
||||
pb.write(1, msgPb.buffer)
|
||||
pb.write(2, uint64(u.sendTime.toUnixMs))
|
||||
@ -53,7 +52,7 @@ proc encodeUnacked(u: UnacknowledgedMessage): ProtoBuffer =
|
||||
pb
|
||||
|
||||
proc decodeUnacked(buf: seq[byte]): ProtobufResult[UnacknowledgedMessage] =
|
||||
let pb = initProtoBuffer(buf)
|
||||
let pb = ProtoBuffer.init(buf)
|
||||
var msgBytes: seq[byte]
|
||||
if not ?pb.getField(1, msgBytes):
|
||||
return err(ProtobufError.missingRequiredField("UnacknowledgedMessage.message"))
|
||||
@ -77,7 +76,7 @@ proc decodeUnacked(buf: seq[byte]): ProtobufResult[UnacknowledgedMessage] =
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc encodeIncoming(m: IncomingMessage): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
var pb = ProtoBuffer.init()
|
||||
let msgPb = wire.encode(m.message)
|
||||
pb.write(1, msgPb.buffer)
|
||||
for dep in m.missingDeps:
|
||||
@ -86,7 +85,7 @@ proc encodeIncoming(m: IncomingMessage): ProtoBuffer =
|
||||
pb
|
||||
|
||||
proc decodeIncoming(buf: seq[byte]): ProtobufResult[IncomingMessage] =
|
||||
let pb = initProtoBuffer(buf)
|
||||
let pb = ProtoBuffer.init(buf)
|
||||
var msgBytes: seq[byte]
|
||||
if not ?pb.getField(1, msgBytes):
|
||||
return err(ProtobufError.missingRequiredField("IncomingMessage.message"))
|
||||
@ -104,7 +103,7 @@ proc decodeIncoming(buf: seq[byte]): ProtobufResult[IncomingMessage] =
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc encodeOutRepairEntry(e: OutgoingRepairEntry): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
var pb = ProtoBuffer.init()
|
||||
let histPb = wire.encodeHistoryEntry(e.outHistEntry)
|
||||
pb.write(1, histPb.buffer)
|
||||
pb.write(2, uint64(e.minTimeRepairReq.toUnixMs))
|
||||
@ -112,11 +111,11 @@ proc encodeOutRepairEntry(e: OutgoingRepairEntry): ProtoBuffer =
|
||||
pb
|
||||
|
||||
proc decodeOutRepairEntry(buf: seq[byte]): ProtobufResult[OutgoingRepairEntry] =
|
||||
let pb = initProtoBuffer(buf)
|
||||
let pb = ProtoBuffer.init(buf)
|
||||
var histBytes: seq[byte]
|
||||
if not ?pb.getField(1, histBytes):
|
||||
return err(ProtobufError.missingRequiredField("OutgoingRepairEntry.outHistEntry"))
|
||||
let histPb = initProtoBuffer(histBytes)
|
||||
let histPb = ProtoBuffer.init(histBytes)
|
||||
let entry = ?wire.decodeHistoryEntry(histPb)
|
||||
var ms: uint64
|
||||
if not ?pb.getField(2, ms):
|
||||
@ -128,7 +127,7 @@ proc decodeOutRepairEntry(buf: seq[byte]): ProtobufResult[OutgoingRepairEntry] =
|
||||
)
|
||||
|
||||
proc encodeOutRepairKV(kv: OutgoingRepairKV): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
var pb = ProtoBuffer.init()
|
||||
pb.write(1, kv.messageId)
|
||||
let entryPb = encodeOutRepairEntry(kv.entry)
|
||||
pb.write(2, entryPb.buffer)
|
||||
@ -136,7 +135,7 @@ proc encodeOutRepairKV(kv: OutgoingRepairKV): ProtoBuffer =
|
||||
pb
|
||||
|
||||
proc decodeOutRepairKV(buf: seq[byte]): ProtobufResult[OutgoingRepairKV] =
|
||||
let pb = initProtoBuffer(buf)
|
||||
let pb = ProtoBuffer.init(buf)
|
||||
var msgId: SdsMessageID
|
||||
if not ?pb.getField(1, msgId):
|
||||
return err(ProtobufError.missingRequiredField("OutgoingRepairKV.messageId"))
|
||||
@ -151,7 +150,7 @@ proc decodeOutRepairKV(buf: seq[byte]): ProtobufResult[OutgoingRepairKV] =
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc encodeInRepairEntry(e: IncomingRepairEntry): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
var pb = ProtoBuffer.init()
|
||||
let histPb = wire.encodeHistoryEntry(e.inHistEntry)
|
||||
pb.write(1, histPb.buffer)
|
||||
pb.write(2, e.cachedMessage)
|
||||
@ -160,11 +159,11 @@ proc encodeInRepairEntry(e: IncomingRepairEntry): ProtoBuffer =
|
||||
pb
|
||||
|
||||
proc decodeInRepairEntry(buf: seq[byte]): ProtobufResult[IncomingRepairEntry] =
|
||||
let pb = initProtoBuffer(buf)
|
||||
let pb = ProtoBuffer.init(buf)
|
||||
var histBytes: seq[byte]
|
||||
if not ?pb.getField(1, histBytes):
|
||||
return err(ProtobufError.missingRequiredField("IncomingRepairEntry.inHistEntry"))
|
||||
let histPb = initProtoBuffer(histBytes)
|
||||
let histPb = ProtoBuffer.init(histBytes)
|
||||
let entry = ?wire.decodeHistoryEntry(histPb)
|
||||
var cached: seq[byte]
|
||||
if not ?pb.getField(2, cached):
|
||||
@ -181,7 +180,7 @@ proc decodeInRepairEntry(buf: seq[byte]): ProtobufResult[IncomingRepairEntry] =
|
||||
)
|
||||
|
||||
proc encodeInRepairKV(kv: IncomingRepairKV): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
var pb = ProtoBuffer.init()
|
||||
pb.write(1, kv.messageId)
|
||||
let entryPb = encodeInRepairEntry(kv.entry)
|
||||
pb.write(2, entryPb.buffer)
|
||||
@ -189,7 +188,7 @@ proc encodeInRepairKV(kv: IncomingRepairKV): ProtoBuffer =
|
||||
pb
|
||||
|
||||
proc decodeInRepairKV(buf: seq[byte]): ProtobufResult[IncomingRepairKV] =
|
||||
let pb = initProtoBuffer(buf)
|
||||
let pb = ProtoBuffer.init(buf)
|
||||
var msgId: SdsMessageID
|
||||
if not ?pb.getField(1, msgId):
|
||||
return err(ProtobufError.missingRequiredField("IncomingRepairKV.messageId"))
|
||||
@ -204,7 +203,7 @@ proc decodeInRepairKV(buf: seq[byte]): ProtobufResult[IncomingRepairKV] =
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc encode*(meta: ChannelMeta): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
var pb = ProtoBuffer.init()
|
||||
pb.write(1, meta.schemaVersion)
|
||||
pb.write(2, uint64(meta.lamportTimestamp))
|
||||
for u in meta.outgoingBuffer:
|
||||
@ -223,7 +222,7 @@ proc encode*(meta: ChannelMeta): ProtoBuffer =
|
||||
pb
|
||||
|
||||
proc decode*(T: type ChannelMeta, buf: seq[byte]): ProtobufResult[T] =
|
||||
let pb = initProtoBuffer(buf)
|
||||
let pb = ProtoBuffer.init(buf)
|
||||
var meta = ChannelMeta.init()
|
||||
|
||||
var ver: uint32
|
||||
@ -271,7 +270,7 @@ proc deserializeChannelMeta*(
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc encode*(d: ChannelData): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
var pb = ProtoBuffer.init()
|
||||
let metaPb = encode(d.meta)
|
||||
pb.write(1, metaPb.buffer)
|
||||
for m in d.messageHistory:
|
||||
@ -281,7 +280,7 @@ proc encode*(d: ChannelData): ProtoBuffer =
|
||||
pb
|
||||
|
||||
proc decode*(T: type ChannelData, buf: seq[byte]): ProtobufResult[T] =
|
||||
let pb = initProtoBuffer(buf)
|
||||
let pb = ProtoBuffer.init(buf)
|
||||
var d = ChannelData.init()
|
||||
var metaBytes: seq[byte]
|
||||
if not ?pb.getField(1, metaBytes):
|
||||
@ -300,7 +299,7 @@ proc decode*(T: type ChannelData, buf: seq[byte]): ProtobufResult[T] =
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc encode*(u: HistoryUpdate): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
var pb = ProtoBuffer.init()
|
||||
for m in u.append:
|
||||
let msgPb = wire.encode(m)
|
||||
pb.write(1, msgPb.buffer)
|
||||
@ -310,7 +309,7 @@ proc encode*(u: HistoryUpdate): ProtoBuffer =
|
||||
pb
|
||||
|
||||
proc decode*(T: type HistoryUpdate, buf: seq[byte]): ProtobufResult[T] =
|
||||
let pb = initProtoBuffer(buf)
|
||||
let pb = ProtoBuffer.init(buf)
|
||||
var u = HistoryUpdate.init()
|
||||
var appBufs: seq[seq[byte]]
|
||||
discard pb.getRepeatedField(1, appBufs)
|
||||
|
||||
@ -1,7 +1,18 @@
|
||||
import results
|
||||
import libp2p/protobuf/minprotobuf
|
||||
|
||||
type
|
||||
ProtoError* {.pure.} = enum
|
||||
## Low-level protobuf wire decode errors surfaced by the field codec in
|
||||
## `sds/protobufutil.nim`.
|
||||
VarintDecode
|
||||
MessageIncomplete
|
||||
BufferOverflow
|
||||
BadWireType
|
||||
IncorrectBlob
|
||||
RequiredFieldMissing
|
||||
|
||||
ProtoResult*[T] = Result[T, ProtoError]
|
||||
|
||||
ProtobufErrorKind* {.pure.} = enum
|
||||
DecodeFailure
|
||||
MissingRequiredField
|
||||
@ -9,13 +20,13 @@ type
|
||||
ProtobufError* = object
|
||||
case kind*: ProtobufErrorKind
|
||||
of DecodeFailure:
|
||||
error*: minprotobuf.ProtoError
|
||||
error*: ProtoError
|
||||
of MissingRequiredField:
|
||||
field*: string
|
||||
|
||||
ProtobufResult*[T] = Result[T, ProtobufError]
|
||||
|
||||
proc init*(T: type ProtobufError, error: minprotobuf.ProtoError): T =
|
||||
proc init*(T: type ProtobufError, error: ProtoError): T =
|
||||
return T(kind: ProtobufErrorKind.DecodeFailure, error: error)
|
||||
|
||||
proc init*(T: type ProtobufError, field: string): T =
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user