Fix improper yield usage in rlpx and refine exception handling (#679)

* Fix improper yield usage in rlpx and refine exception handling

* Handle post hello step error
This commit is contained in:
andri lim 2024-02-19 14:16:33 +07:00 committed by GitHub
parent efe610e27f
commit d8209f623f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 83 additions and 39 deletions

View File

@ -104,7 +104,13 @@ proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) =
let f = d.transp.sendTo(ta, data) let f = d.transp.sendTo(ta, data)
let cb = proc(data: pointer) {.gcsafe.} = let cb = proc(data: pointer) {.gcsafe.} =
if f.failed: if f.failed:
debug "Discovery send failed", msg = f.readError.msg when defined(chronicles_log_level):
try:
# readError will raise FutureError
debug "Discovery send failed", msg = f.readError.msg
except FutureError as exc:
error "Failed to get discovery send future error", msg=exc.msg
f.addCallback cb f.addCallback cb
proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] = proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] =

View File

@ -335,8 +335,12 @@ proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = -1) =
userHandlerProc.addPragma ident"gcsafe" userHandlerProc.addPragma ident"gcsafe"
# we only take the pragma
let dummy = quote do:
proc dummy(): Future[void] {.async: (raises: [EthP2PError]).}
if p.isRlpx: if p.isRlpx:
userHandlerProc.addPragma ident"async" userHandlerProc.addPragma dummy.pragma[0]
var var
getState = ident"getState" getState = ident"getState"
@ -375,7 +379,17 @@ proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = -1) =
template networkState(`peerVar`: `PeerType`): `NetworkStateType` {.used.} = template networkState(`peerVar`: `PeerType`): `NetworkStateType` {.used.} =
`NetworkStateType`(`getNetworkState`(`peerVar`.network, `protocolInfo`)) `NetworkStateType`(`getNetworkState`(`peerVar`.network, `protocolInfo`))
proc addPreludeDefs*(userHandlerProc: NimNode, definitions: NimNode) = proc addExceptionHandler(userHandlerProc: NimNode) =
let bodyTemp = userHandlerProc.body
userHandlerProc.body = quote do:
try:
`bodyTemp`
except CancelledError as exc:
raise newException(EthP2PError, exc.msg)
except CatchableError as exc:
raise newException(EthP2PError, exc.msg)
proc addPreludeDefs(userHandlerProc: NimNode, definitions: NimNode) =
userHandlerProc.body[0].add definitions userHandlerProc.body[0].add definitions
proc eventHandlerToProc(p: P2PProtocol, doBlock: NimNode, handlerName: string): NimNode = proc eventHandlerToProc(p: P2PProtocol, doBlock: NimNode, handlerName: string): NimNode =
@ -385,6 +399,7 @@ proc eventHandlerToProc(p: P2PProtocol, doBlock: NimNode, handlerName: string):
doBlock.copyChildrenTo(result) doBlock.copyChildrenTo(result)
result.name = ident(p.name & handlerName) # genSym(nskProc, p.name & handlerName) result.name = ident(p.name & handlerName) # genSym(nskProc, p.name & handlerName)
p.augmentUserHandler result p.augmentUserHandler result
result.addExceptionHandler()
proc addTimeoutParam(procDef: NimNode, defaultValue: int64) = proc addTimeoutParam(procDef: NimNode, defaultValue: int64) =
var var
@ -470,7 +485,8 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, id: int,
if protocol.useRequestIds: if protocol.useRequestIds:
initResponderCall.add reqIdVar initResponderCall.add reqIdVar
userHandler.addPreludeDefs newVarStmt(responseVar, initResponderCall) userHandler.addPreludeDefs quote do:
var `responseVar` {.used.} = `initResponderCall`
result.initResponderCall = initResponderCall result.initResponderCall = initResponderCall
@ -479,6 +495,7 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, id: int,
of msgResponse: userHandler.applyDecorator protocol.incomingResponseDecorator of msgResponse: userHandler.applyDecorator protocol.incomingResponseDecorator
else: discard else: discard
userHandler.addExceptionHandler()
result.userHandler = userHandler result.userHandler = userHandler
protocol.outRecvProcs.add result.userHandler protocol.outRecvProcs.add result.userHandler
@ -796,12 +813,17 @@ proc createHandshakeTemplate*(msg: Message,
let peerVar = genSym(nskLet ,"peer") let peerVar = genSym(nskLet ,"peer")
handshakeExchanger.setBody quote do: handshakeExchanger.setBody quote do:
let `peerVar` = `peerValue` try:
let sendingFuture = `forwardCall` let `peerVar` = `peerValue`
`handshakeImpl`(`peerVar`, let sendingFuture = `forwardCall`
sendingFuture, `handshakeImpl`(`peerVar`,
`nextMsg`(`peerVar`, `msgRecName`), sendingFuture,
`timeoutVar`) `nextMsg`(`peerVar`, `msgRecName`),
`timeoutVar`)
except PeerDisconnected as exc:
raise newException(EthP2PError, exc.msg)
except P2PInternalError as exc:
raise newException(EthP2PError, exc.msg)
return handshakeExchanger return handshakeExchanger

View File

@ -84,17 +84,21 @@ type
name*: string name*: string
version*: int version*: int
UnsupportedProtocol* = object of Defect EthP2PError* = object of CatchableError
UnsupportedProtocol* = object of EthP2PError
# This is raised when you attempt to send a message from a particular # This is raised when you attempt to send a message from a particular
# protocol to a peer that doesn't support the protocol. # protocol to a peer that doesn't support the protocol.
MalformedMessageError* = object of CatchableError MalformedMessageError* = object of EthP2PError
UnsupportedMessageError* = object of CatchableError UnsupportedMessageError* = object of EthP2PError
PeerDisconnected* = object of CatchableError PeerDisconnected* = object of EthP2PError
reason*: DisconnectionReason reason*: DisconnectionReason
UselessPeerError* = object of CatchableError UselessPeerError* = object of EthP2PError
P2PInternalError* = object of EthP2PError
## ##
## Quasy-private types. Use at your own risk. ## Quasy-private types. Use at your own risk.
@ -155,7 +159,7 @@ type
MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode
ThunkProc* = proc(x: Peer, msgId: int, data: Rlp): Future[void] ThunkProc* = proc(x: Peer, msgId: int, data: Rlp): Future[void]
{.gcsafe, async: (raises: [RlpError, CatchableError]).} {.gcsafe, async: (raises: [RlpError, EthP2PError]).}
MessageContentPrinter* = proc(msg: pointer): string MessageContentPrinter* = proc(msg: pointer): string
{.gcsafe, raises: [].} {.gcsafe, raises: [].}
@ -173,10 +177,10 @@ type
{.gcsafe, raises: [].} {.gcsafe, raises: [].}
HandshakeStep* = proc(peer: Peer): Future[void] HandshakeStep* = proc(peer: Peer): Future[void]
{.gcsafe, raises: [].} {.gcsafe, async: (raises: [EthP2PError]).}
DisconnectionHandler* = proc(peer: Peer, reason: DisconnectionReason): DisconnectionHandler* = proc(peer: Peer, reason: DisconnectionReason):
Future[void] {.gcsafe, raises: [].} Future[void] {.gcsafe, async: (raises: [EthP2PError]).}
ConnectionState* = enum ConnectionState* = enum
None, None,

View File

@ -176,7 +176,7 @@ proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} =
# result = $(cast[ptr MsgType](msg)[]) # result = $(cast[ptr MsgType](msg)[])
proc disconnect*(peer: Peer, reason: DisconnectionReason, proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async: (raises:[CatchableError]).} notifyOtherPeer = false) {.async: (raises:[]).}
template raisePeerDisconnected(msg: string, r: DisconnectionReason) = template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg) var e = newException(PeerDisconnected, msg)
@ -185,7 +185,8 @@ template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
proc disconnectAndRaise(peer: Peer, proc disconnectAndRaise(peer: Peer,
reason: DisconnectionReason, reason: DisconnectionReason,
msg: string) {.async.} = msg: string) {.async:
(raises: [PeerDisconnected]).} =
let r = reason let r = reason
await peer.disconnect(r) await peer.disconnect(r)
raisePeerDisconnected(msg, r) raisePeerDisconnected(msg, r)
@ -193,24 +194,26 @@ proc disconnectAndRaise(peer: Peer,
proc handshakeImpl[T](peer: Peer, proc handshakeImpl[T](peer: Peer,
sendFut: Future[void], sendFut: Future[void],
responseFut: Future[T], responseFut: Future[T],
timeout: Duration): Future[T] {.async.} = timeout: Duration): Future[T] {.async:
(raises: [PeerDisconnected, P2PInternalError]).} =
sendFut.addCallback do (arg: pointer) {.gcsafe.}: sendFut.addCallback do (arg: pointer) {.gcsafe.}:
if sendFut.failed: if sendFut.failed:
debug "Handshake message not delivered", peer debug "Handshake message not delivered", peer
doAssert timeout.milliseconds > 0 doAssert timeout.milliseconds > 0
yield responseFut or sleepAsync(timeout)
if not responseFut.finished: try:
let res = await responseFut.wait(timeout)
return res
except AsyncTimeoutError:
# TODO: Really shouldn't disconnect and raise everywhere. In order to avoid # TODO: Really shouldn't disconnect and raise everywhere. In order to avoid
# understanding what error occured where. # understanding what error occured where.
# And also, incoming and outgoing disconnect errors should be seperated, # And also, incoming and outgoing disconnect errors should be seperated,
# probably by seperating the actual disconnect call to begin with. # probably by seperating the actual disconnect call to begin with.
await disconnectAndRaise(peer, HandshakeTimeout, await disconnectAndRaise(peer, HandshakeTimeout,
"Protocol handshake was not received in time.") "Protocol handshake was not received in time.")
elif responseFut.failed: except CatchableError as exc:
raise responseFut.error raise newException(P2PInternalError, exc.msg)
else:
return responseFut.read
# Dispatcher # Dispatcher
# #
@ -363,7 +366,7 @@ template perPeerMsgId(peer: Peer, MsgType: type): int =
perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId) perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId)
proc invokeThunk*(peer: Peer, msgId: int, msgData: Rlp): Future[void] proc invokeThunk*(peer: Peer, msgId: int, msgData: Rlp): Future[void]
{.async: (raises: [CatchableError, rlp.RlpError]).} = {.async: (raises: [rlp.RlpError, EthP2PError]).} =
template invalidIdError: untyped = template invalidIdError: untyped =
raise newException(UnsupportedMessageError, raise newException(UnsupportedMessageError,
"RLPx message with an invalid id " & $msgId & "RLPx message with an invalid id " & $msgId &
@ -393,6 +396,7 @@ proc sendMsg*(peer: Peer, data: seq[byte]) {.async.} =
if res != len(cipherText): if res != len(cipherText):
# This is ECONNRESET or EPIPE case when remote peer disconnected. # This is ECONNRESET or EPIPE case when remote peer disconnected.
await peer.disconnect(TcpError) await peer.disconnect(TcpError)
discard
except CatchableError as e: except CatchableError as e:
await peer.disconnect(TcpError) await peer.disconnect(TcpError)
raise e raise e
@ -948,7 +952,7 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
proc `thunkName`(`peerVar`: `Peer`, _: int, data: Rlp) proc `thunkName`(`peerVar`: `Peer`, _: int, data: Rlp)
# Fun error if you just use `RlpError` instead of `rlp.RlpError`: # Fun error if you just use `RlpError` instead of `rlp.RlpError`:
# "Error: type expected, but got symbol 'RlpError' of kind 'EnumField'" # "Error: type expected, but got symbol 'RlpError' of kind 'EnumField'"
{.async: (raises: [rlp.RlpError, CatchableError]).} = {.async: (raises: [rlp.RlpError, EthP2PError]).} =
var `receivedRlp` = data var `receivedRlp` = data
var `receivedMsg` {.noinit.}: `msgRecName` var `receivedMsg` {.noinit.}: `msgRecName`
`readParamsPrelude` `readParamsPrelude`
@ -1067,14 +1071,14 @@ proc removePeer(network: EthereumNode, peer: Peer) =
observer.onPeerDisconnected(peer) observer.onPeerDisconnected(peer)
proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason):
Future[void] {.async.} = Future[void] {.async: (raises: []).} =
var futures = newSeqOfCap[Future[void]](protocolCount()) var futures = newSeqOfCap[Future[void]](protocolCount())
for protocol in peer.dispatcher.activeProtocols: for protocol in peer.dispatcher.activeProtocols:
if protocol.disconnectHandler != nil: if protocol.disconnectHandler != nil:
futures.add((protocol.disconnectHandler)(peer, reason)) futures.add((protocol.disconnectHandler)(peer, reason))
await allFutures(futures) await noCancel allFutures(futures)
for f in futures: for f in futures:
doAssert(f.finished()) doAssert(f.finished())
@ -1082,7 +1086,7 @@ proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason):
trace "Disconnection handler ended with an error", err = f.error.msg trace "Disconnection handler ended with an error", err = f.error.msg
proc disconnect*(peer: Peer, reason: DisconnectionReason, proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async: (raises: [CatchableError]).} = notifyOtherPeer = false) {.async: (raises: []).} =
if peer.connectionState notin {Disconnecting, Disconnected}: if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting peer.connectionState = Disconnecting
# Do this first so sub-protocols have time to clean up and stop sending # Do this first so sub-protocols have time to clean up and stop sending
@ -1094,15 +1098,16 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
await callDisconnectHandlers(peer, reason) await callDisconnectHandlers(peer, reason)
if notifyOtherPeer and not peer.transport.closed: if notifyOtherPeer and not peer.transport.closed:
var fut = peer.sendDisconnectMsg(DisconnectionReasonList(value: reason))
yield fut
if fut.failed:
debug "Failed to deliver disconnect message", peer
proc waitAndClose(peer: Peer, time: Duration) {.async.} = proc waitAndClose(peer: Peer, time: Duration) {.async.} =
await sleepAsync(time) await sleepAsync(time)
await peer.transport.closeWait() await peer.transport.closeWait()
try:
await peer.sendDisconnectMsg(DisconnectionReasonList(value: reason))
except CatchableError as exc:
debug "Failed to deliver disconnect message", peer, msg=exc.msg
# Give the peer a chance to disconnect # Give the peer a chance to disconnect
traceAsyncErrors peer.waitAndClose(2.seconds) traceAsyncErrors peer.waitAndClose(2.seconds)
elif not peer.transport.closed: elif not peer.transport.closed:
@ -1336,7 +1341,7 @@ proc rlpxConnect*(node: EthereumNode, remote: Node):
10.seconds) 10.seconds)
except RlpError: except RlpError:
return err(ProtocolError) return err(ProtocolError)
except PeerDisconnected as e: except PeerDisconnected:
return err(PeerDisconnectedError) return err(PeerDisconnectedError)
# TODO: Strange compiler error # TODO: Strange compiler error
# case e.reason: # case e.reason:
@ -1350,6 +1355,8 @@ proc rlpxConnect*(node: EthereumNode, remote: Node):
# return err(PeerDisconnectedError) # return err(PeerDisconnectedError)
except TransportError: except TransportError:
return err(P2PTransportError) return err(P2PTransportError)
except P2PInternalError:
return err(P2PHandshakeError)
except CatchableError as e: except CatchableError as e:
raiseAssert($e.name & " " & $e.msg) raiseAssert($e.name & " " & $e.msg)
@ -1374,6 +1381,8 @@ proc rlpxConnect*(node: EthereumNode, remote: Node):
return err(UselessRlpxPeerError) return err(UselessRlpxPeerError)
except TransportError: except TransportError:
return err(P2PTransportError) return err(P2PTransportError)
except EthP2PError:
return err(ProtocolError)
except CatchableError as e: except CatchableError as e:
raiseAssert($e.name & " " & $e.msg) raiseAssert($e.name & " " & $e.msg)
@ -1385,7 +1394,7 @@ proc rlpxConnect*(node: EthereumNode, remote: Node):
# TODO: rework rlpxAccept similar to rlpxConnect. # TODO: rework rlpxAccept similar to rlpxConnect.
proc rlpxAccept*( proc rlpxAccept*(
node: EthereumNode, transport: StreamTransport): Future[Peer] {.async: (raises: [CatchableError]).} = node: EthereumNode, transport: StreamTransport): Future[Peer] {.async: (raises: []).} =
initTracing(devp2pInfo, node.protocols) initTracing(devp2pInfo, node.protocols)
let peer = Peer(transport: transport, network: node) let peer = Peer(transport: transport, network: node)

View File

@ -33,7 +33,10 @@ p2pProtocol eth(version = 63,
genesisHash: KeccakHash) genesisHash: KeccakHash)
requestResponse: requestResponse:
proc getBlockHeaders(peer: Peer, request: openArray[KeccakHash]) {.gcsafe.} = discard proc getBlockHeaders(peer: Peer, request: openArray[KeccakHash]) {.gcsafe.} =
var headers: seq[BlockHeader]
await response.send(headers)
proc blockHeaders(p: Peer, headers: openArray[BlockHeader]) proc blockHeaders(p: Peer, headers: openArray[BlockHeader])
requestResponse: requestResponse: