Improved error-handling; Timeouts in all handshakes

This commit is contained in:
Zahary Karadjov 2019-03-26 15:07:50 +02:00 committed by zah
parent d1737f8669
commit dc2b6170b5
4 changed files with 87 additions and 50 deletions

View File

@ -64,16 +64,16 @@ type
name*: string name*: string
version*: int version*: int
UnsupportedProtocol* = object of Exception UnsupportedProtocol* = object of Defect
# This is raised when you attempt to send a message from a particular # This is raised when you attempt to send a message from a particular
# protocol to a peer that doesn't support the protocol. # protocol to a peer that doesn't support the protocol.
MalformedMessageError* = object of Exception MalformedMessageError* = object of CatchableError
PeerDisconnected* = object of Exception PeerDisconnected* = object of CatchableError
reason*: DisconnectionReason reason*: DisconnectionReason
UselessPeerError* = object of Exception UselessPeerError* = object of CatchableError
## ##
## Quasy-private types. Use at your own risk. ## Quasy-private types. Use at your own risk.

View File

@ -37,7 +37,7 @@ template devp2pInfo: auto = {.gcsafe.}: gDevp2pInfo
chronicles.formatIt(Peer): $(it.remote) chronicles.formatIt(Peer): $(it.remote)
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true) {.gcsafe, async.} proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.gcsafe, async.}
template raisePeerDisconnected(msg: string, r: DisconnectionReason) = template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg) var e = newException(PeerDisconnected, msg)
@ -278,9 +278,8 @@ template compressMsg(peer: Peer, data: Bytes): Bytes =
data data
proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} = proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} =
var cipherText = encryptMsg(peer.compressMsg(data), peer.secretsState)
try: try:
var cipherText = encryptMsg(peer.compressMsg(data), peer.secretsState)
discard await peer.transport.write(cipherText) discard await peer.transport.write(cipherText)
except: except:
await peer.disconnect(TcpError) await peer.disconnect(TcpError)
@ -483,7 +482,7 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} =
elif nextMsgId == 1: # p2p.disconnect elif nextMsgId == 1: # p2p.disconnect
let reason = DisconnectionReason nextMsgData.listElem(0).toInt(uint32) let reason = DisconnectionReason nextMsgData.listElem(0).toInt(uint32)
await peer.disconnect(reason, notifyOtherPeer = false) await peer.disconnect(reason)
raisePeerDisconnected("Unexpected disconnect", reason) raisePeerDisconnected("Unexpected disconnect", reason)
else: else:
warn "Dropped RLPX message", warn "Dropped RLPX message",
@ -511,7 +510,7 @@ proc dispatchMessages*(peer: Peer) {.async.} =
if msgId == 1: # p2p.disconnect if msgId == 1: # p2p.disconnect
await peer.transport.closeWait() await peer.transport.closeWait()
let reason = msgData.listElem(0).toInt(uint32).DisconnectionReason let reason = msgData.listElem(0).toInt(uint32).DisconnectionReason
await peer.disconnect(reason, notifyOtherPeer = false) await peer.disconnect(reason)
break break
try: try:
@ -520,10 +519,25 @@ proc dispatchMessages*(peer: Peer) {.async.} =
debug "ending dispatchMessages loop", peer, err = getCurrentExceptionMsg() debug "ending dispatchMessages loop", peer, err = getCurrentExceptionMsg()
await peer.disconnect(BreachOfProtocol) await peer.disconnect(BreachOfProtocol)
return return
except CatchableError:
warn "Error while handling RLPx message", peer,
msg = peer.getMsgName(msgId),
err = getCurrentExceptionMsg()
# TODO: Hmm, this can be safely moved into the message handler thunk.
# The documentation will need to be updated, explaning the fact that
# nextMsg will be resolved only if the message handler has executed
# successfully.
if peer.awaitedMessages[msgId] != nil: if peer.awaitedMessages[msgId] != nil:
let msgInfo = peer.dispatcher.messages[msgId] let msgInfo = peer.dispatcher.messages[msgId]
try:
(msgInfo.nextMsgResolver)(msgData, peer.awaitedMessages[msgId]) (msgInfo.nextMsgResolver)(msgData, peer.awaitedMessages[msgId])
except:
# TODO: Handling errors here must be investigated more carefully.
# They also are supposed to be handled at the call-site where
# `nextMsg` is used.
debug "nextMsg resolver failed", err = getCurrentExceptionMsg()
raise
peer.awaitedMessages[msgId] = nil peer.awaitedMessages[msgId] = nil
macro p2pProtocolImpl(name: static[string], macro p2pProtocolImpl(name: static[string],
@ -1103,20 +1117,29 @@ proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): Future[voi
return all(futures) return all(futures)
proc handshakeImpl(peer: Peer, proc handshakeImpl[T](peer: Peer,
handshakeSendFut: Future[void], sendFut: Future[void],
timeout: Duration, responseFut: Future[T],
HandshakeType: type): Future[HandshakeType] {.async.} = timeout: Duration): Future[T] {.async.} =
asyncCheck handshakeSendFut sendFut.addCallback do (arg: pointer):
var response = nextMsg(peer, HandshakeType) if sendFut.failed:
if timeout.milliseconds > 0: debug "Handshake message not delivered", peer
await response or sleepAsync(timeout)
if not response.finished: doAssert timeout.milliseconds > 0
yield responseFut or sleepAsync(timeout)
if not responseFut.finished:
discard disconnectAndRaise(peer, BreachOfProtocol, discard disconnectAndRaise(peer, BreachOfProtocol,
"sub-protocol handshake was not received in time.") "Protocol handshake was not received in time.")
elif responseFut.failed:
raise responseFut.error
else: else:
discard await response return responseFut.read
return response.read
proc handshakeImpl(peer: Peer,
sendFut: Future[void],
timeout: Duration,
HandshakeType: type): Future[HandshakeType] =
handshakeImpl(peer, sendFut, nextMsg(peer, HandshakeType), timeout)
macro handshake*(peer: Peer, timeout: untyped, sendCall: untyped): untyped = macro handshake*(peer: Peer, timeout: untyped, sendCall: untyped): untyped =
let let
@ -1127,16 +1150,18 @@ macro handshake*(peer: Peer, timeout: untyped, sendCall: untyped): untyped =
result = newCall(bindSym"handshakeImpl", peer, sendCall, timeout, msgType) result = newCall(bindSym"handshakeImpl", peer, sendCall, timeout, msgType)
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true) {.async.} = proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
if peer.connectionState notin {Disconnecting, Disconnected}: if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting peer.connectionState = Disconnecting
if notifyOtherPeer and not peer.transport.closed:
var fut = peer.sendDisconnectMsg(reason)
yield fut
if fut.failed:
warn "Failed to delived disconnect message", peer
try: try:
# TODO: investigate the failure here
if false and notifyOtherPeer and not peer.transport.closed:
await peer.sendDisconnectMsg(reason)
finally:
if not peer.dispatcher.isNil: if not peer.dispatcher.isNil:
await callDisconnectHandlers(peer, reason) await callDisconnectHandlers(peer, reason)
finally:
logDisconnectedPeer peer logDisconnectedPeer peer
peer.connectionState = Disconnected peer.connectionState = Disconnected
removePeer(peer.network, peer) removePeer(peer.network, peer)
@ -1198,9 +1223,8 @@ proc postHelloSteps(peer: Peer, h: devp2p.hello) {.async.} =
messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} = messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} =
if messageProcessingLoop.failed: if messageProcessingLoop.failed:
error "dispatchMessages failed", error "dispatchMessages failed", err = messageProcessingLoop.error.msg
err = messageProcessingLoop.error.msg asyncDiscard peer.disconnect(ClientQuitting)
asyncCheck peer.disconnect(ClientQuitting)
# The handshake may involve multiple async steps, so we wait # The handshake may involve multiple async steps, so we wait
# here for all of them to finish. # here for all of them to finish.
@ -1216,7 +1240,7 @@ template `^`(arr): auto =
proc check(status: AuthStatus) = proc check(status: AuthStatus) =
if status != AuthStatus.Success: if status != AuthStatus.Success:
raise newException(Exception, "Error: " & $status) raise newException(CatchableError, "Error: " & $status)
proc initSecretState(hs: var Handshake, authMsg, ackMsg: openarray[byte], proc initSecretState(hs: var Handshake, authMsg, ackMsg: openarray[byte],
p: Peer) = p: Peer) =
@ -1288,13 +1312,18 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
# if handshake.remoteHPubkey != remote.node.pubKey: # if handshake.remoteHPubkey != remote.node.pubKey:
# raise newException(Exception, "Remote pubkey is wrong") # raise newException(Exception, "Remote pubkey is wrong")
logConnectedPeer result logConnectedPeer result
asyncCheck result.hello(handshake.getVersion(),
var sendHelloFut = result.hello(
handshake.getVersion(),
node.clientId, node.clientId,
node.capabilities, node.capabilities,
uint(node.address.tcpPort), uint(node.address.tcpPort),
node.keys.pubkey.getRaw()) node.keys.pubkey.getRaw())
var response = await result.waitSingleMsg(devp2p.hello) var response = await result.handshakeImpl(
sendHelloFut,
result.waitSingleMsg(devp2p.hello),
10.seconds)
if not validatePubKeyInHello(response, remote.node.pubKey): if not validatePubKeyInHello(response, remote.node.pubKey):
warn "Remote nodeId is not its public key" # XXX: Do we care? warn "Remote nodeId is not its public key" # XXX: Do we care?
@ -1362,11 +1391,19 @@ proc rlpxAccept*(node: EthereumNode,
let listenPort = transport.localAddress().port let listenPort = transport.localAddress().port
logAcceptedPeer result logAcceptedPeer result
await result.hello(result.baseProtocolVersion, node.clientId,
node.capabilities, listenPort.uint, var sendHelloFut = result.hello(
result.baseProtocolVersion,
node.clientId,
node.capabilities,
listenPort.uint,
node.keys.pubkey.getRaw()) node.keys.pubkey.getRaw())
var response = await result.waitSingleMsg(devp2p.hello) var response = await result.handshakeImpl(
sendHelloFut,
result.waitSingleMsg(devp2p.hello),
10.seconds)
if not validatePubKeyInHello(response, handshake.remoteHPubkey): if not validatePubKeyInHello(response, handshake.remoteHPubkey):
warn "A Remote nodeId is not its public key" # XXX: Do we care? warn "A Remote nodeId is not its public key" # XXX: Do we care?

View File

@ -46,13 +46,13 @@ p2pProtocol eth(version = protocolVersion,
chain = network.chain chain = network.chain
bestBlock = chain.getBestBlockHeader bestBlock = chain.getBestBlockHeader
await peer.status(protocolVersion, let m = await peer.handhake(timeout = 10.seconds,
status(protocolVersion,
network.networkId, network.networkId,
bestBlock.difficulty, bestBlock.difficulty,
bestBlock.blockHash, bestBlock.blockHash,
chain.genesisHash) chain.genesisHash))
let m = await peer.nextMsg(eth.status)
if m.networkId == network.networkId and m.genesisHash == chain.genesisHash: if m.networkId == network.networkId and m.genesisHash == chain.genesisHash:
trace "suitable peer", peer trace "suitable peer", peer
else: else:

View File

@ -208,7 +208,7 @@ p2pProtocol les(version = lesVersion,
lesProperties.add(keyAnnounceType => lesNetwork.ourAnnounceType) lesProperties.add(keyAnnounceType => lesNetwork.ourAnnounceType)
let let
s = await peer.nextMsg(les.status) s = await peer.handshake(timeout = 10.seconds, status(lesProperties))
peerNetworkId = s.values.getRequiredValue(keyNetworkId, uint) peerNetworkId = s.values.getRequiredValue(keyNetworkId, uint)
peerGenesisHash = s.values.getRequiredValue(keyGenesisHash, KeccakHash) peerGenesisHash = s.values.getRequiredValue(keyGenesisHash, KeccakHash)
peerLesVersion = s.values.getRequiredValue(keyProtocolVersion, uint) peerLesVersion = s.values.getRequiredValue(keyProtocolVersion, uint)