Fix for decoding large manifests (#479)
This commit is contained in:
parent
7227a4a38d
commit
3bb5960739
|
@ -35,3 +35,4 @@ codex.nims
|
||||||
nimbus-build-system.paths
|
nimbus-build-system.paths
|
||||||
docker/hostdatadir
|
docker/hostdatadir
|
||||||
docker/prometheus-data
|
docker/prometheus-data
|
||||||
|
.DS_Store
|
||||||
|
|
|
@ -84,6 +84,8 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
|
||||||
await b.inflightSema.acquire()
|
await b.inflightSema.acquire()
|
||||||
trace "Sending message to peer", peer = id
|
trace "Sending message to peer", peer = id
|
||||||
await peer[].send(msg)
|
await peer[].send(msg)
|
||||||
|
except CatchableError as err:
|
||||||
|
error "Error sending message", peer = id, msg = err.msg
|
||||||
finally:
|
finally:
|
||||||
b.inflightSema.release()
|
b.inflightSema.release()
|
||||||
do:
|
do:
|
||||||
|
|
|
@ -15,14 +15,12 @@ import pkg/chronicles
|
||||||
import pkg/libp2p
|
import pkg/libp2p
|
||||||
|
|
||||||
import ../protobuf/blockexc
|
import ../protobuf/blockexc
|
||||||
|
import ../protobuf/message
|
||||||
import ../../errors
|
import ../../errors
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "codex blockexcnetworkpeer"
|
topics = "codex blockexcnetworkpeer"
|
||||||
|
|
||||||
const
|
|
||||||
MaxMessageSize = 100 * 1 shl 20 # manifest files can be big
|
|
||||||
|
|
||||||
type
|
type
|
||||||
ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.}
|
ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.}
|
||||||
|
|
||||||
|
@ -45,12 +43,12 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
|
||||||
try:
|
try:
|
||||||
while not conn.atEof or not conn.closed:
|
while not conn.atEof or not conn.closed:
|
||||||
let
|
let
|
||||||
data = await conn.readLp(MaxMessageSize)
|
data = await conn.readLp(MaxMessageSize.int)
|
||||||
msg = Message.protobufDecode(data).mapFailure().tryGet()
|
msg = Message.protobufDecode(data).mapFailure().tryGet()
|
||||||
trace "Got message for peer", peer = b.id
|
trace "Got message for peer", peer = b.id
|
||||||
await b.handler(b, msg)
|
await b.handler(b, msg)
|
||||||
except CatchableError as exc:
|
except CatchableError as err:
|
||||||
trace "Exception in blockexc read loop", exc = exc.msg
|
warn "Exception in blockexc read loop", msg = err.msg
|
||||||
finally:
|
finally:
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,11 @@
|
||||||
|
|
||||||
import pkg/libp2p/protobuf/minprotobuf
|
import pkg/libp2p/protobuf/minprotobuf
|
||||||
|
|
||||||
|
import ../../units
|
||||||
|
|
||||||
|
const
|
||||||
|
MaxBlockSize* = 100.MiBs.uint
|
||||||
|
MaxMessageSize* = 100.MiBs.uint
|
||||||
|
|
||||||
type
|
type
|
||||||
WantType* = enum
|
WantType* = enum
|
||||||
|
@ -72,7 +77,7 @@ proc write*(pb: var ProtoBuffer, field: int, value: Wantlist) =
|
||||||
pb.write(field, ipb)
|
pb.write(field, ipb)
|
||||||
|
|
||||||
proc write*(pb: var ProtoBuffer, field: int, value: Block) =
|
proc write*(pb: var ProtoBuffer, field: int, value: Block) =
|
||||||
var ipb = initProtoBuffer()
|
var ipb = initProtoBuffer(maxSize = MaxBlockSize)
|
||||||
ipb.write(1, value.prefix)
|
ipb.write(1, value.prefix)
|
||||||
ipb.write(2, value.data)
|
ipb.write(2, value.data)
|
||||||
ipb.finish()
|
ipb.finish()
|
||||||
|
@ -99,7 +104,7 @@ proc write*(pb: var ProtoBuffer, field: int, value: StateChannelUpdate) =
|
||||||
pb.write(field, ipb)
|
pb.write(field, ipb)
|
||||||
|
|
||||||
proc protobufEncode*(value: Message): seq[byte] =
|
proc protobufEncode*(value: Message): seq[byte] =
|
||||||
var ipb = initProtoBuffer()
|
var ipb = initProtoBuffer(maxSize = MaxMessageSize)
|
||||||
ipb.write(1, value.wantlist)
|
ipb.write(1, value.wantlist)
|
||||||
for v in value.payload:
|
for v in value.payload:
|
||||||
ipb.write(3, v)
|
ipb.write(3, v)
|
||||||
|
@ -175,14 +180,14 @@ proc decode*(_: type StateChannelUpdate, pb: ProtoBuffer): ProtoResult[StateChan
|
||||||
proc protobufDecode*(_: type Message, msg: seq[byte]): ProtoResult[Message] =
|
proc protobufDecode*(_: type Message, msg: seq[byte]): ProtoResult[Message] =
|
||||||
var
|
var
|
||||||
value = Message()
|
value = Message()
|
||||||
pb = initProtoBuffer(msg)
|
pb = initProtoBuffer(msg, maxSize = MaxMessageSize)
|
||||||
ipb: ProtoBuffer
|
ipb: ProtoBuffer
|
||||||
sublist: seq[seq[byte]]
|
sublist: seq[seq[byte]]
|
||||||
if ? pb.getField(1, ipb):
|
if ? pb.getField(1, ipb):
|
||||||
value.wantlist = ? Wantlist.decode(ipb)
|
value.wantlist = ? Wantlist.decode(ipb)
|
||||||
if ? pb.getRepeatedField(3, sublist):
|
if ? pb.getRepeatedField(3, sublist):
|
||||||
for item in sublist:
|
for item in sublist:
|
||||||
value.payload.add(? Block.decode(initProtoBuffer(item)))
|
value.payload.add(? Block.decode(initProtoBuffer(item, maxSize = MaxBlockSize)))
|
||||||
if ? pb.getRepeatedField(4, sublist):
|
if ? pb.getRepeatedField(4, sublist):
|
||||||
for item in sublist:
|
for item in sublist:
|
||||||
value.blockPresences.add(? BlockPresence.decode(initProtoBuffer(item)))
|
value.blockPresences.add(? BlockPresence.decode(initProtoBuffer(item)))
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit a3e9d1ed80c048cd5abc839cbe0863cefcedc702
|
Subproject commit 8c2eca18dcc538c57a8fbc0c53fd0b9d24d56cff
|
Loading…
Reference in New Issue