diff --git a/beacon_chain/libp2p_backend.nim b/beacon_chain/libp2p_backend.nim index 6b9b93b14..0a394775d 100644 --- a/beacon_chain/libp2p_backend.nim +++ b/beacon_chain/libp2p_backend.nim @@ -167,10 +167,10 @@ proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} = if msg.libp2pProtocol.len > 0: await daemon.addHandler(@[msg.libp2pProtocol], msg.thunk) -proc readMsg(stream: P2PStream, - MsgType: type, - withResponseCode: bool, - deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.} +proc readChunk(stream: P2PStream, + MsgType: type, + withResponseCode: bool, + deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.} proc readSizePrefix(transp: StreamTransport, deadline: Future[void]): Future[int] {.async.} = @@ -207,7 +207,7 @@ proc readMsgBytes(stream: P2PStream, logScope: responseCode = ResponseCode(responseCode) case ResponseCode(responseCode) of InvalidRequest, ServerError: - let responseErrMsg = await readMsg(stream, string, false, deadline) + let responseErrMsg = await readChunk(stream, string, false, deadline) debug "P2P request resulted in error", responseErrMsg return of Success: @@ -236,18 +236,40 @@ proc readMsgBytesOrClose(stream: P2PStream, result = await stream.readMsgBytes(withResponseCode, deadline) if result.len == 0: await stream.close() -proc readMsg(stream: P2PStream, - MsgType: type, - withResponseCode: bool, - deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = +proc readChunk(stream: P2PStream, + MsgType: type, + withResponseCode: bool, + deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = var msgBytes = await stream.readMsgBytesOrClose(withResponseCode, deadline) try: - if msgBytes.len > 0: return some SSZ.decode(msgBytes, MsgType) + if msgBytes.len > 0: + return some SSZ.decode(msgBytes, MsgType) except SerializationError as err: debug "Failed to decode a network message", msgBytes, errMsg = err.formatMsg("") return +proc readResponse( + stream: P2PStream, + MsgType: type, + deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = + + when MsgType is seq: + type E = ElemType(MsgType) + var results: MsgType + while true: + # This loop will keep reading messages until the deadline is over + # or the other side closes the stream or provides an invalid respose. + # The underlying use of `readMsgBytesOrClose` will ensure that the + # stream is closed on our side as well. + let nextRes = await readChunk(stream, E, true, deadline) + if nextRes.isNone: break + results.add nextRes.get + if results.len > 0: + return some(results) + else: + return await readChunk(stream, MsgType, true, deadline) + proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes = var s = init OutputStream s.append byte(responseCode) @@ -323,7 +345,7 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, await disconnectAndRaise(peer, FaultOrError, "Incomplete send") # Read the response - return await stream.readMsg(ResponseMsg, true, deadline) + return await stream.readResponse(ResponseMsg, deadline) proc p2pStreamName(MsgType: type): string = mixin msgProtocol, protocolInfo, msgId