mirror of https://github.com/status-im/nim-eth.git
Merge pull request #39 from kdeme/bug/fix-transport-leaks-#37
Bug/fix transport leaks #37
This commit is contained in:
commit
55e1eff4a9
|
@ -51,7 +51,7 @@ proc runP2pTests() =
|
||||||
"test_enode",
|
"test_enode",
|
||||||
"test_shh",
|
"test_shh",
|
||||||
"test_shh_connect",
|
"test_shh_connect",
|
||||||
"test_failing_handler",
|
"test_protocol_handlers",
|
||||||
]:
|
]:
|
||||||
runTest("tests/p2p/" & filename)
|
runTest("tests/p2p/" & filename)
|
||||||
|
|
||||||
|
|
|
@ -76,8 +76,6 @@ proc processIncoming(server: StreamServer,
|
||||||
# malicious peer opens multiple connections
|
# malicious peer opens multiple connections
|
||||||
debug "Disconnecting peer (incoming)", reason = AlreadyConnected
|
debug "Disconnecting peer (incoming)", reason = AlreadyConnected
|
||||||
await peer.disconnect(AlreadyConnected)
|
await peer.disconnect(AlreadyConnected)
|
||||||
else:
|
|
||||||
remote.close()
|
|
||||||
|
|
||||||
proc listeningAddress*(node: EthereumNode): ENode =
|
proc listeningAddress*(node: EthereumNode): ENode =
|
||||||
return initENode(node.keys.pubKey, node.address)
|
return initENode(node.keys.pubKey, node.address)
|
||||||
|
|
|
@ -511,22 +511,34 @@ proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
|
||||||
# handled a layer lower for clarity (and consistency), as also the actual
|
# handled a layer lower for clarity (and consistency), as also the actual
|
||||||
# message handler code as the TODO mentions already.
|
# message handler code as the TODO mentions already.
|
||||||
proc dispatchMessages*(peer: Peer) {.async.} =
|
proc dispatchMessages*(peer: Peer) {.async.} =
|
||||||
while true:
|
while peer.connectionState notin {Disconnecting, Disconnected}:
|
||||||
var msgId: int
|
var msgId: int
|
||||||
var msgData: Rlp
|
var msgData: Rlp
|
||||||
try:
|
try:
|
||||||
(msgId, msgData) = await peer.recvMsg()
|
(msgId, msgData) = await peer.recvMsg()
|
||||||
except TransportIncompleteError:
|
except TransportIncompleteError:
|
||||||
trace "Connection dropped, ending dispatchMessages loop", peer
|
# Note: Could also "Transport is already closed!" error occur? Might have
|
||||||
# This can happen during the rlpx connection setup or at any point after.
|
# to change here to the general TransportError.
|
||||||
# Because this code does not know, a disconnect needs to be done.
|
case peer.connectionState
|
||||||
await peer.disconnect(ClientQuitting)
|
of Connected:
|
||||||
|
# Dropped connection, still need to cleanup the peer.
|
||||||
|
# This could be seen as bad behaving peer.
|
||||||
|
trace "Dropped connection", peer
|
||||||
|
await peer.disconnect(ClientQuitting, false)
|
||||||
|
return
|
||||||
|
of Disconnecting, Disconnected:
|
||||||
|
# Graceful disconnect, can still cause TransportIncompleteError as it
|
||||||
|
# could be that this loop was waiting at recvMsg().
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
# Connection dropped while `Connecting` (in rlpxConnect/rlpxAccept).
|
||||||
|
return
|
||||||
|
except PeerDisconnected:
|
||||||
return
|
return
|
||||||
|
|
||||||
if msgId == 1: # p2p.disconnect
|
if msgId == 1: # p2p.disconnect
|
||||||
await peer.transport.closeWait()
|
|
||||||
let reason = msgData.listElem(0).toInt(uint32).DisconnectionReason
|
let reason = msgData.listElem(0).toInt(uint32).DisconnectionReason
|
||||||
await peer.disconnect(reason)
|
await peer.disconnect(reason, false)
|
||||||
break
|
break
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -1176,18 +1188,29 @@ macro handshake*(peer: Peer, timeout: untyped, sendCall: untyped): untyped =
|
||||||
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
|
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
|
||||||
if peer.connectionState notin {Disconnecting, Disconnected}:
|
if peer.connectionState notin {Disconnecting, Disconnected}:
|
||||||
peer.connectionState = Disconnecting
|
peer.connectionState = Disconnecting
|
||||||
if notifyOtherPeer and not peer.transport.closed:
|
# Do this first so sub-protocols have time to clean up and stop sending
|
||||||
var fut = peer.sendDisconnectMsg(reason)
|
# before this node closes transport to remote peer
|
||||||
yield fut
|
|
||||||
if fut.failed:
|
|
||||||
debug "Failed to delived disconnect message", peer
|
|
||||||
|
|
||||||
if not peer.dispatcher.isNil:
|
if not peer.dispatcher.isNil:
|
||||||
# In case of `CatchableError` in any of the handlers, this will be logged.
|
# In case of `CatchableError` in any of the handlers, this will be logged.
|
||||||
# Other handlers will still execute.
|
# Other handlers will still execute.
|
||||||
# In case of `Defect` in any of the handlers, program will quit.
|
# In case of `Defect` in any of the handlers, program will quit.
|
||||||
traceAwaitErrors callDisconnectHandlers(peer, reason)
|
traceAwaitErrors callDisconnectHandlers(peer, reason)
|
||||||
|
|
||||||
|
if notifyOtherPeer and not peer.transport.closed:
|
||||||
|
var fut = peer.sendDisconnectMsg(reason)
|
||||||
|
yield fut
|
||||||
|
if fut.failed:
|
||||||
|
debug "Failed to deliver disconnect message", peer
|
||||||
|
|
||||||
|
proc waitAndClose(peer: Peer, time: Duration) {.async.} =
|
||||||
|
await sleepAsync(time)
|
||||||
|
await peer.transport.closeWait()
|
||||||
|
|
||||||
|
# Give the peer a chance to disconnect
|
||||||
|
traceAsyncErrors peer.waitAndClose(2.seconds)
|
||||||
|
elif not peer.transport.closed:
|
||||||
|
peer.transport.close()
|
||||||
|
|
||||||
logDisconnectedPeer peer
|
logDisconnectedPeer peer
|
||||||
peer.connectionState = Disconnected
|
peer.connectionState = Disconnected
|
||||||
removePeer(peer.network, peer)
|
removePeer(peer.network, peer)
|
||||||
|
|
|
@ -39,8 +39,20 @@ p2pProtocol xyz(version = 1,
|
||||||
if true:
|
if true:
|
||||||
raise newException(CatchableError, "Fake xyz exception")
|
raise newException(CatchableError, "Fake xyz exception")
|
||||||
|
|
||||||
|
p2pProtocol hah(version = 1,
|
||||||
|
shortName = "hah",
|
||||||
|
networkState = network):
|
||||||
|
|
||||||
|
onPeerConnected do (peer: Peer):
|
||||||
|
if true:
|
||||||
|
raise newException(UselessPeerError, "Fake hah exception")
|
||||||
|
peer.networkState.count += 1
|
||||||
|
|
||||||
|
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}:
|
||||||
|
peer.networkState.count -= 1
|
||||||
|
|
||||||
suite "Testing protocol handlers":
|
suite "Testing protocol handlers":
|
||||||
asyncTest "Failing disconnect handler":
|
asyncTest "Failing disconnection handler":
|
||||||
let bootENode = waitFor setupBootNode()
|
let bootENode = waitFor setupBootNode()
|
||||||
var node1 = setupTestNode(abc, xyz)
|
var node1 = setupTestNode(abc, xyz)
|
||||||
var node2 = setupTestNode(abc, xyz)
|
var node2 = setupTestNode(abc, xyz)
|
||||||
|
@ -60,3 +72,14 @@ suite "Testing protocol handlers":
|
||||||
# handlers, each handler still ran
|
# handlers, each handler still ran
|
||||||
node1.protocolState(abc).count == 0
|
node1.protocolState(abc).count == 0
|
||||||
node1.protocolState(xyz).count == 0
|
node1.protocolState(xyz).count == 0
|
||||||
|
|
||||||
|
asyncTest "Failing connection handler":
|
||||||
|
var node1 = setupTestNode(hah)
|
||||||
|
var node2 = setupTestNode(hah)
|
||||||
|
node2.startListening()
|
||||||
|
let peer = waitFor node1.rlpxConnect(newNode(initENode(node2.keys.pubKey,
|
||||||
|
node2.address)))
|
||||||
|
check:
|
||||||
|
peer.isNil == true
|
||||||
|
# To check if the disconnection handler did not run
|
||||||
|
node1.protocolState(hah).count == 0
|
Loading…
Reference in New Issue