mirror of
https://github.com/status-im/nim-eth.git
synced 2025-01-23 12:59:05 +00:00
Fix transport leaks + handle disconnects in message loop
This commit is contained in:
parent
14513b3bd8
commit
6ba61488ff
@ -511,22 +511,34 @@ proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
|
||||
# handled a layer lower for clarity (and consistency), as also the actual
|
||||
# message handler code as the TODO mentions already.
|
||||
proc dispatchMessages*(peer: Peer) {.async.} =
|
||||
while true:
|
||||
while peer.connectionState notin {Disconnecting, Disconnected}:
|
||||
var msgId: int
|
||||
var msgData: Rlp
|
||||
try:
|
||||
(msgId, msgData) = await peer.recvMsg()
|
||||
except TransportIncompleteError:
|
||||
trace "Connection dropped, ending dispatchMessages loop", peer
|
||||
# This can happen during the rlpx connection setup or at any point after.
|
||||
# Because this code does not know, a disconnect needs to be done.
|
||||
await peer.disconnect(ClientQuitting)
|
||||
# Note: Could also "Transport is already closed!" error occur? Might have
|
||||
# to change here to the general TransportError.
|
||||
case peer.connectionState
|
||||
of Connected:
|
||||
# Dropped connection, still need to cleanup the peer.
|
||||
# This could be seen as bad behaving peer.
|
||||
trace "Dropped connection", peer
|
||||
await peer.disconnect(ClientQuitting, false)
|
||||
return
|
||||
of Disconnecting, Disconnected:
|
||||
# Graceful disconnect, can still cause TransportIncompleteError as it
|
||||
# could be that this loop was waiting at recvMsg().
|
||||
return
|
||||
else:
|
||||
# Connection dropped while `Connecting` (in rlpxConnect/rlpxAccept).
|
||||
return
|
||||
except PeerDisconnected:
|
||||
return
|
||||
|
||||
if msgId == 1: # p2p.disconnect
|
||||
await peer.transport.closeWait()
|
||||
let reason = msgData.listElem(0).toInt(uint32).DisconnectionReason
|
||||
await peer.disconnect(reason)
|
||||
await peer.disconnect(reason, false)
|
||||
break
|
||||
|
||||
try:
|
||||
@ -1176,18 +1188,29 @@ macro handshake*(peer: Peer, timeout: untyped, sendCall: untyped): untyped =
|
||||
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
|
||||
if peer.connectionState notin {Disconnecting, Disconnected}:
|
||||
peer.connectionState = Disconnecting
|
||||
if notifyOtherPeer and not peer.transport.closed:
|
||||
var fut = peer.sendDisconnectMsg(reason)
|
||||
yield fut
|
||||
if fut.failed:
|
||||
debug "Failed to delived disconnect message", peer
|
||||
|
||||
# Do this first so sub-protocols have time to clean up and stop sending
|
||||
# before this node closes transport to remote peer
|
||||
if not peer.dispatcher.isNil:
|
||||
# In case of `CatchableError` in any of the handlers, this will be logged.
|
||||
# Other handlers will still execute.
|
||||
# In case of `Defect` in any of the handlers, program will quit.
|
||||
traceAwaitErrors callDisconnectHandlers(peer, reason)
|
||||
|
||||
if notifyOtherPeer and not peer.transport.closed:
|
||||
var fut = peer.sendDisconnectMsg(reason)
|
||||
yield fut
|
||||
if fut.failed:
|
||||
debug "Failed to deliver disconnect message", peer
|
||||
|
||||
proc waitAndClose(peer: Peer, time: Duration) {.async.} =
|
||||
await sleepAsync(time)
|
||||
await peer.transport.closeWait()
|
||||
|
||||
# Give the peer a chance to disconnect
|
||||
await peer.waitAndClose(2.seconds)
|
||||
elif not peer.transport.closed:
|
||||
await peer.transport.closeWait()
|
||||
|
||||
logDisconnectedPeer peer
|
||||
peer.connectionState = Disconnected
|
||||
removePeer(peer.network, peer)
|
||||
|
Loading…
x
Reference in New Issue
Block a user