Add detailed tracing for blocks by range requests
This commit is contained in:
parent
34ed2b0fa4
commit
4c2ded25a6
|
@ -31,7 +31,7 @@ type
|
||||||
|
|
||||||
BeaconNodeConf* = object
|
BeaconNodeConf* = object
|
||||||
logLevel* {.
|
logLevel* {.
|
||||||
defaultValue: enabledLogLevel
|
defaultValue: LogLevel.DEBUG
|
||||||
desc: "Sets the log level."
|
desc: "Sets the log level."
|
||||||
name: "log-level" }: LogLevel
|
name: "log-level" }: LogLevel
|
||||||
|
|
||||||
|
|
|
@ -85,8 +85,6 @@ type
|
||||||
|
|
||||||
TransmissionError* = object of CatchableError
|
TransmissionError* = object of CatchableError
|
||||||
|
|
||||||
ResponseSizeLimitReached* = object of CatchableError
|
|
||||||
|
|
||||||
const
|
const
|
||||||
defaultIncomingReqTimeout = 5000
|
defaultIncomingReqTimeout = 5000
|
||||||
HandshakeTimeout = FaultOrError
|
HandshakeTimeout = FaultOrError
|
||||||
|
@ -337,9 +335,6 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
|
||||||
if sent != bytes.len:
|
if sent != bytes.len:
|
||||||
raise newException(TransmissionError, "Failed to deliver msg bytes")
|
raise newException(TransmissionError, "Failed to deliver msg bytes")
|
||||||
|
|
||||||
template raiseMaxRespSizeError =
|
|
||||||
raise newException(ResponseSizeLimitReached, "Response size limit reached")
|
|
||||||
|
|
||||||
# TODO There is too much duplication in the responder functions, but
|
# TODO There is too much duplication in the responder functions, but
|
||||||
# I hope to reduce this when I increse the reliance on output streams.
|
# I hope to reduce this when I increse the reliance on output streams.
|
||||||
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
|
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
|
||||||
|
@ -604,12 +599,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||||
try:
|
try:
|
||||||
`tracing`
|
`tracing`
|
||||||
`awaitUserHandler`
|
`awaitUserHandler`
|
||||||
except ResponseSizeLimitReached:
|
|
||||||
# The response size limit is currently handled with an exception in
|
|
||||||
# order to make it easier to switch to an alternative policy when it
|
|
||||||
# will be signalled with an error response code (and to avoid making
|
|
||||||
# the `response` API in the high-level protocols more complicated for now).
|
|
||||||
chronicles.debug "response size limit reached", peer, reqName = `msgNameLit`
|
|
||||||
except CatchableError as `errVar`:
|
except CatchableError as `errVar`:
|
||||||
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, `errVar`.msg)
|
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, `errVar`.msg)
|
||||||
|
|
||||||
|
|
|
@ -85,8 +85,6 @@ type
|
||||||
|
|
||||||
TransmissionError* = object of CatchableError
|
TransmissionError* = object of CatchableError
|
||||||
|
|
||||||
ResponseSizeLimitReached* = object of CatchableError
|
|
||||||
|
|
||||||
const
|
const
|
||||||
defaultIncomingReqTimeout = 5000
|
defaultIncomingReqTimeout = 5000
|
||||||
HandshakeTimeout = FaultOrError
|
HandshakeTimeout = FaultOrError
|
||||||
|
@ -187,21 +185,26 @@ proc readChunk(stream: P2PStream,
|
||||||
|
|
||||||
proc readSizePrefix(transp: StreamTransport,
|
proc readSizePrefix(transp: StreamTransport,
|
||||||
deadline: Future[void]): Future[int] {.async.} =
|
deadline: Future[void]): Future[int] {.async.} =
|
||||||
|
trace "about to read msg size prefix"
|
||||||
var parser: VarintParser[uint64, ProtoBuf]
|
var parser: VarintParser[uint64, ProtoBuf]
|
||||||
while true:
|
while true:
|
||||||
var nextByte: byte
|
var nextByte: byte
|
||||||
var readNextByte = transp.readExactly(addr nextByte, 1)
|
var readNextByte = transp.readExactly(addr nextByte, 1)
|
||||||
await readNextByte or deadline
|
await readNextByte or deadline
|
||||||
if not readNextByte.finished:
|
if not readNextByte.finished:
|
||||||
|
trace "size prefix byte not received in time"
|
||||||
return -1
|
return -1
|
||||||
case parser.feedByte(nextByte)
|
case parser.feedByte(nextByte)
|
||||||
of Done:
|
of Done:
|
||||||
let res = parser.getResult
|
let res = parser.getResult
|
||||||
if res > uint64(REQ_RESP_MAX_SIZE):
|
if res > uint64(REQ_RESP_MAX_SIZE):
|
||||||
|
trace "size prefix outside of range", res
|
||||||
return -1
|
return -1
|
||||||
else:
|
else:
|
||||||
|
trace "got size prefix", res
|
||||||
return int(res)
|
return int(res)
|
||||||
of Overflow:
|
of Overflow:
|
||||||
|
trace "size prefix overflow"
|
||||||
return -1
|
return -1
|
||||||
of Incomplete:
|
of Incomplete:
|
||||||
continue
|
continue
|
||||||
|
@ -209,26 +212,39 @@ proc readSizePrefix(transp: StreamTransport,
|
||||||
proc readMsgBytes(stream: P2PStream,
|
proc readMsgBytes(stream: P2PStream,
|
||||||
withResponseCode: bool,
|
withResponseCode: bool,
|
||||||
deadline: Future[void]): Future[Bytes] {.async.} =
|
deadline: Future[void]): Future[Bytes] {.async.} =
|
||||||
|
trace "about to read message bytes", withResponseCode
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if withResponseCode:
|
if withResponseCode:
|
||||||
var responseCode: byte
|
var responseCode: byte
|
||||||
|
trace "about to read response code"
|
||||||
var readResponseCode = stream.transp.readExactly(addr responseCode, 1)
|
var readResponseCode = stream.transp.readExactly(addr responseCode, 1)
|
||||||
await readResponseCode or deadline
|
await readResponseCode or deadline
|
||||||
|
|
||||||
if not readResponseCode.finished:
|
if not readResponseCode.finished:
|
||||||
|
trace "response code not received in time"
|
||||||
|
return
|
||||||
|
|
||||||
|
if responseCode > ResponseCode.high.byte:
|
||||||
|
trace "invalid response code", responseCode
|
||||||
return
|
return
|
||||||
if responseCode > ResponseCode.high.byte: return
|
|
||||||
|
|
||||||
logScope: responseCode = ResponseCode(responseCode)
|
logScope: responseCode = ResponseCode(responseCode)
|
||||||
|
trace "got response code"
|
||||||
|
|
||||||
case ResponseCode(responseCode)
|
case ResponseCode(responseCode)
|
||||||
of InvalidRequest, ServerError:
|
of InvalidRequest, ServerError:
|
||||||
let responseErrMsg = await readChunk(stream, string, false, deadline)
|
let responseErrMsg = await readChunk(stream, string, false, deadline)
|
||||||
debug "P2P request resulted in error", responseErrMsg
|
debug "P2P request resulted in error", responseErrMsg
|
||||||
return
|
return
|
||||||
|
|
||||||
of Success:
|
of Success:
|
||||||
# The response is OK, the execution continues below
|
# The response is OK, the execution continues below
|
||||||
discard
|
discard
|
||||||
|
|
||||||
var sizePrefix = await readSizePrefix(stream.transp, deadline)
|
var sizePrefix = await readSizePrefix(stream.transp, deadline)
|
||||||
|
trace "got msg size prefix", sizePrefix
|
||||||
|
|
||||||
if sizePrefix == -1:
|
if sizePrefix == -1:
|
||||||
debug "Failed to read an incoming message size prefix", peer = stream.peer
|
debug "Failed to read an incoming message size prefix", peer = stream.peer
|
||||||
return
|
return
|
||||||
|
@ -237,12 +253,17 @@ proc readMsgBytes(stream: P2PStream,
|
||||||
debug "Received SSZ with zero size", peer = stream.peer
|
debug "Received SSZ with zero size", peer = stream.peer
|
||||||
return
|
return
|
||||||
|
|
||||||
|
trace "about to read msg bytes"
|
||||||
var msgBytes = newSeq[byte](sizePrefix)
|
var msgBytes = newSeq[byte](sizePrefix)
|
||||||
var readBody = stream.transp.readExactly(addr msgBytes[0], sizePrefix)
|
var readBody = stream.transp.readExactly(addr msgBytes[0], sizePrefix)
|
||||||
await readBody or deadline
|
await readBody or deadline
|
||||||
if not readBody.finished: return
|
if not readBody.finished:
|
||||||
|
trace "msg bytes not received in time"
|
||||||
|
return
|
||||||
|
|
||||||
|
trace "got message bytes", msgBytes
|
||||||
return msgBytes
|
return msgBytes
|
||||||
|
|
||||||
except TransportIncompleteError:
|
except TransportIncompleteError:
|
||||||
return @[]
|
return @[]
|
||||||
|
|
||||||
|
@ -337,9 +358,6 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
|
||||||
if sent != bytes.len:
|
if sent != bytes.len:
|
||||||
raise newException(TransmissionError, "Failed to deliver msg bytes")
|
raise newException(TransmissionError, "Failed to deliver msg bytes")
|
||||||
|
|
||||||
template raiseMaxRespSizeError =
|
|
||||||
raise newException(ResponseSizeLimitReached, "Response size limit reached")
|
|
||||||
|
|
||||||
# TODO There is too much duplication in the responder functions, but
|
# TODO There is too much duplication in the responder functions, but
|
||||||
# I hope to reduce this when I increse the reliance on output streams.
|
# I hope to reduce this when I increse the reliance on output streams.
|
||||||
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
|
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
|
||||||
|
@ -572,6 +590,11 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||||
msg.defineThunk quote do:
|
msg.defineThunk quote do:
|
||||||
proc `thunkName`(`daemonVar`: `DaemonAPI`,
|
proc `thunkName`(`daemonVar`: `DaemonAPI`,
|
||||||
`streamVar`: `P2PStream`) {.async, gcsafe.} =
|
`streamVar`: `P2PStream`) {.async, gcsafe.} =
|
||||||
|
when `msgNameLit` == "beaconBlocksByRange":
|
||||||
|
setLogLevel(LogLevel.TRACE)
|
||||||
|
defer: setLogLevel(LogLevel.DEBUG)
|
||||||
|
trace "incoming beaconBlocksByRange stream"
|
||||||
|
|
||||||
defer:
|
defer:
|
||||||
`await` safeClose(`streamVar`)
|
`await` safeClose(`streamVar`)
|
||||||
|
|
||||||
|
@ -587,6 +610,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||||
|
|
||||||
var `msgVar`: `msgRecName`
|
var `msgVar`: `msgRecName`
|
||||||
try:
|
try:
|
||||||
|
trace "about to decode incoming msg"
|
||||||
`msgVar` = decode(`Format`, `msgBytesVar`, `msgRecName`)
|
`msgVar` = decode(`Format`, `msgBytesVar`, `msgRecName`)
|
||||||
except SerializationError as `errVar`:
|
except SerializationError as `errVar`:
|
||||||
`await` sendErrorResponse(`peerVar`, `streamVar`, `errVar`,
|
`await` sendErrorResponse(`peerVar`, `streamVar`, `errVar`,
|
||||||
|
@ -603,13 +627,8 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||||
|
|
||||||
try:
|
try:
|
||||||
`tracing`
|
`tracing`
|
||||||
|
trace "about to execute user handler"
|
||||||
`awaitUserHandler`
|
`awaitUserHandler`
|
||||||
except ResponseSizeLimitReached:
|
|
||||||
# The response size limit is currently handled with an exception in
|
|
||||||
# order to make it easier to switch to an alternative policy when it
|
|
||||||
# will be signalled with an error response code (and to avoid making
|
|
||||||
# the `response` API in the high-level protocols more complicated for now).
|
|
||||||
chronicles.debug "response size limit reached", peer, reqName = `msgNameLit`
|
|
||||||
except CatchableError as `errVar`:
|
except CatchableError as `errVar`:
|
||||||
try:
|
try:
|
||||||
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, `errVar`.msg)
|
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, `errVar`.msg)
|
||||||
|
|
Loading…
Reference in New Issue