From f8bdec88c9afa1559208f67a5ccb9cfebeece1da Mon Sep 17 00:00:00 2001 From: kdeme Date: Tue, 11 Jun 2019 12:46:26 +0200 Subject: [PATCH 1/4] Rework duplicate connections check and fix #36 --- eth/p2p.nim | 8 +++----- eth/p2p/peer_pool.nim | 23 +++++++++-------------- eth/p2p/rlpx.nim | 10 ++++++++++ 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/eth/p2p.nim b/eth/p2p.nim index 705d590..8055d7a 100644 --- a/eth/p2p.nim +++ b/eth/p2p.nim @@ -70,12 +70,10 @@ proc processIncoming(server: StreamServer, yield peerfut if not peerfut.failed: let peer = peerfut.read() + trace "Connection established (incoming)", peer if node.peerPool != nil: - if not node.peerPool.addPeer(peer): - # In case an outgoing connection was added in the meanwhile or a - # malicious peer opens multiple connections - debug "Disconnecting peer (incoming)", reason = AlreadyConnected - await peer.disconnect(AlreadyConnected) + node.peerPool.connectingNodes.excl(peer.remote) + node.peerPool.addPeer(peer) proc listeningAddress*(node: EthereumNode): ENode = return initENode(node.keys.pubKey, node.address) diff --git a/eth/p2p/peer_pool.nim b/eth/p2p/peer_pool.nim index 328fdab..ce3f86c 100644 --- a/eth/p2p/peer_pool.nim +++ b/eth/p2p/peer_pool.nim @@ -107,24 +107,19 @@ proc getRandomBootnode(p: PeerPool): Option[Node] = if p.discovery.bootstrapNodes.len != 0: result = option(p.discovery.bootstrapNodes.rand()) -proc addPeer*(pool: PeerPool, peer: Peer): bool = - if peer.remote notin pool.connectedNodes: - pool.connectedNodes[peer.remote] = peer - for o in pool.observers.values: - if not o.onPeerConnected.isNil: - if o.protocol.isNil or peer.supports(o.protocol): - o.onPeerConnected(peer) - return true - else: return false +proc addPeer*(pool: PeerPool, peer: Peer) = + doAssert(peer.remote notin pool.connectedNodes) + pool.connectedNodes[peer.remote] = peer + for o in pool.observers.values: + if not o.onPeerConnected.isNil: + if o.protocol.isNil or peer.supports(o.protocol): + o.onPeerConnected(peer) proc connectToNode*(p: PeerPool, n: Node) {.async.} = let peer = await p.connect(n) if not peer.isNil: - trace "Connection established", peer - if not p.addPeer(peer): - # In case an incoming connection was added in the meanwhile - trace "Disconnecting peer (outgoing)", reason = AlreadyConnected - await peer.disconnect(AlreadyConnected) + trace "Connection established (outgoing)", peer + p.addPeer(peer) proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} = for node in nodes: diff --git a/eth/p2p/rlpx.nim b/eth/p2p/rlpx.nim index 0acf941..051a564 100644 --- a/eth/p2p/rlpx.nim +++ b/eth/p2p/rlpx.nim @@ -1480,6 +1480,16 @@ proc rlpxAccept*(node: EthereumNode, udpPort: remote.port) result.remote = newNode(initEnode(handshake.remoteHPubkey, address)) + # In case there is an outgoing connection started with this peer we give + # precedence to that one and we disconnect here with `AlreadyConnected` + if result.remote in node.peerPool.connectedNodes or + result.remote in node.peerPool.connectingNodes: + trace "Duplicate connection in rlpxAccept" + raisePeerDisconnected("Peer already connecting or connected", + AlreadyConnected) + + node.peerPool.connectingNodes.incl(result.remote) + await postHelloSteps(result, response) ok = true except PeerDisconnected as e: From b44675eda5d0e8e3c2d9486b9003c8d43346b7b7 Mon Sep 17 00:00:00 2001 From: kdeme Date: Tue, 11 Jun 2019 14:38:02 +0200 Subject: [PATCH 2/4] Same error handling for rlpxAccept as for rlpxConnect --- eth/p2p.nim | 6 ++---- eth/p2p/rlpx.nim | 29 +++++++++++++++++------------ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/eth/p2p.nim b/eth/p2p.nim index 8055d7a..d78c570 100644 --- a/eth/p2p.nim +++ b/eth/p2p.nim @@ -66,10 +66,8 @@ proc newEthereumNode*(keys: KeyPair, proc processIncoming(server: StreamServer, remote: StreamTransport): Future[void] {.async, gcsafe.} = var node = getUserData[EthereumNode](server) - let peerfut = node.rlpxAccept(remote) - yield peerfut - if not peerfut.failed: - let peer = peerfut.read() + let peer = await node.rlpxAccept(remote) + if not peer.isNil: trace "Connection established (incoming)", peer if node.peerPool != nil: node.peerPool.connectingNodes.excl(peer.remote) diff --git a/eth/p2p/rlpx.nim b/eth/p2p/rlpx.nim index 051a564..cf1c4f0 100644 --- a/eth/p2p/rlpx.nim +++ b/eth/p2p/rlpx.nim @@ -1393,8 +1393,10 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = await postHelloSteps(result, response) ok = true except PeerDisconnected as e: - if e.reason != TooManyPeers: - debug "Unexpected disconnect during rlpxConnect", reason = e.reason + if e.reason == AlreadyConnected or e.reason == TooManyPeers: + trace "Disconnect during rlpxAccept", reason = e.reason + else: + debug "Unexpected disconnect during rlpxAccept", reason = e.reason except TransportIncompleteError: trace "Connection dropped in rlpxConnect", remote except UselessPeerError: @@ -1405,12 +1407,10 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = debug "Rlp error in rlpxConnect" except TransportOsError: trace "TransportOsError", err = getCurrentExceptionMsg() - except: - error "Exception in rlpxConnect", remote, + except CatchableError: + error "Unexpected exception in rlpxConnect", remote, exc = getCurrentException().name, err = getCurrentExceptionMsg() - result = nil - raise if not ok: if not isNil(result.transport): @@ -1493,7 +1493,7 @@ proc rlpxAccept*(node: EthereumNode, await postHelloSteps(result, response) ok = true except PeerDisconnected as e: - if e.reason == AlreadyConnected: + if e.reason == AlreadyConnected or e.reason == TooManyPeers: trace "Disconnect during rlpxAccept", reason = e.reason else: debug "Unexpected disconnect during rlpxAccept", reason = e.reason @@ -1501,16 +1501,21 @@ proc rlpxAccept*(node: EthereumNode, trace "Connection dropped in rlpxAccept", remote = result.remote except UselessPeerError: trace "Disconnecting useless peer", peer = result.remote - except: - error "Exception in rlpxAccept", - err = getCurrentExceptionMsg(), - stackTrace = getCurrentException().getStackTrace() + except RlpTypeMismatch: + # Some peers report capabilities with names longer than 3 chars. We ignore + # those for now. Maybe we should allow this though. + debug "Rlp error in rlpxAccept" + except TransportOsError: + trace "TransportOsError", err = getCurrentExceptionMsg() + except CatchableError: + error "Unexpected exception in rlpxAccept", + exc = getCurrentException().name, + err = getCurrentExceptionMsg() if not ok: if not isNil(result.transport): result.transport.close() result = nil - raise when isMainModule: From 9e29d2211f2e998af53b23d1dfeedefd5fc0c40a Mon Sep 17 00:00:00 2001 From: kdeme Date: Tue, 11 Jun 2019 14:44:24 +0200 Subject: [PATCH 3/4] Fix compilation issue for shh_basic_client --- tests/p2p/shh_basic_client.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/p2p/shh_basic_client.nim b/tests/p2p/shh_basic_client.nim index 38e6dd7..98929e4 100644 --- a/tests/p2p/shh_basic_client.nim +++ b/tests/p2p/shh_basic_client.nim @@ -113,8 +113,8 @@ if config.main: else: netId = 15 -let keys = newKeyPair() -var node = newEthereumNode(keys, address, netId, nil, addAllCapabilities = false) +let keypair = newKeyPair() +var node = newEthereumNode(keypair, address, netId, nil, addAllCapabilities = false) node.addCapability Whisper # lets prepare some prearranged keypairs From 0966a4e9ca97f9fcb5f4bb7da6528ab569da89a0 Mon Sep 17 00:00:00 2001 From: kdeme Date: Tue, 11 Jun 2019 15:31:00 +0200 Subject: [PATCH 4/4] Adjust deprecated use of sleepAsync --- eth/p2p.nim | 2 +- eth/p2p/peer_pool.nim | 4 ++-- eth/p2p/rlpx_protocols/whisper_protocol.nim | 4 ++-- tests/p2p/test_shh_connect.nim | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/eth/p2p.nim b/eth/p2p.nim index d78c570..df0755e 100644 --- a/eth/p2p.nim +++ b/eth/p2p.nim @@ -111,7 +111,7 @@ proc connectToNetwork*(node: EthereumNode, while node.peerPool.connectedNodes.len == 0: trace "Waiting for more peers", peers = node.peerPool.connectedNodes.len - await sleepAsync(500) + await sleepAsync(500.milliseconds) proc stopListening*(node: EthereumNode) = node.listeningServer.stop() diff --git a/eth/p2p/peer_pool.nim b/eth/p2p/peer_pool.nim index ce3f86c..e73339c 100644 --- a/eth/p2p/peer_pool.nim +++ b/eth/p2p/peer_pool.nim @@ -8,7 +8,7 @@ import const lookupInterval = 5 - connectLoopSleepMs = 2000 + connectLoopSleep = chronos.milliseconds(2000) proc newPeerPool*(network: EthereumNode, networkId: uint, keyPair: KeyPair, @@ -179,7 +179,7 @@ proc run(p: PeerPool) {.async.} = if dropConnections: await p.stopAllPeers() - await sleepAsync(connectLoopSleepMs) + await sleepAsync(connectLoopSleep) proc start*(p: PeerPool) = if not p.running: diff --git a/eth/p2p/rlpx_protocols/whisper_protocol.nim b/eth/p2p/rlpx_protocols/whisper_protocol.nim index 0d4a824..c382fed 100644 --- a/eth/p2p/rlpx_protocols/whisper_protocol.nim +++ b/eth/p2p/rlpx_protocols/whisper_protocol.nim @@ -29,8 +29,8 @@ const whisperVersionStr* = $whisperVersion defaultMinPow* = 0.2'f64 defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size - messageInterval* = 300 ## Interval at which messages are send to peers, in ms - pruneInterval* = 1000 ## Interval at which message queue is pruned, in ms + messageInterval* = chronos.milliseconds(300) ## Interval at which messages are send to peers, in ms + pruneInterval* = chronos.milliseconds(1000) ## Interval at which message queue is pruned, in ms type Hash* = MDigest[256] diff --git a/tests/p2p/test_shh_connect.nim b/tests/p2p/test_shh_connect.nim index 949bf38..592c45b 100644 --- a/tests/p2p/test_shh_connect.nim +++ b/tests/p2p/test_shh_connect.nim @@ -18,7 +18,7 @@ proc resetMessageQueues(nodes: varargs[EthereumNode]) = let bootENode = waitFor setupBootNode() let safeTTL = 5'u32 -let waitInterval = messageInterval + 150 +let waitInterval = messageInterval + 150.milliseconds var node1 = setupTestNode(Whisper) var node2 = setupTestNode(Whisper) @@ -283,7 +283,7 @@ suite "Whisper connections": await sleepAsync(waitInterval) check node1.protocolState(Whisper).queue.items.len == 10 - await sleepAsync(int((lowerTTL+1)*1000)) + await sleepAsync(milliseconds((lowerTTL+1)*1000)) check node1.protocolState(Whisper).queue.items.len == 0 check node2.protocolState(Whisper).queue.items.len == 0