Merge pull request #61 from kdeme/rlpx-changes

Rlpx changes
This commit is contained in:
kdeme 2019-06-12 00:47:04 -07:00 committed by GitHub
commit a12e01fb90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 50 additions and 44 deletions

View File

@ -66,16 +66,12 @@ proc newEthereumNode*(keys: KeyPair,
proc processIncoming(server: StreamServer, proc processIncoming(server: StreamServer,
remote: StreamTransport): Future[void] {.async, gcsafe.} = remote: StreamTransport): Future[void] {.async, gcsafe.} =
var node = getUserData[EthereumNode](server) var node = getUserData[EthereumNode](server)
let peerfut = node.rlpxAccept(remote) let peer = await node.rlpxAccept(remote)
yield peerfut if not peer.isNil:
if not peerfut.failed: trace "Connection established (incoming)", peer
let peer = peerfut.read()
if node.peerPool != nil: if node.peerPool != nil:
if not node.peerPool.addPeer(peer): node.peerPool.connectingNodes.excl(peer.remote)
# In case an outgoing connection was added in the meanwhile or a node.peerPool.addPeer(peer)
# malicious peer opens multiple connections
debug "Disconnecting peer (incoming)", reason = AlreadyConnected
await peer.disconnect(AlreadyConnected)
proc listeningAddress*(node: EthereumNode): ENode = proc listeningAddress*(node: EthereumNode): ENode =
return initENode(node.keys.pubKey, node.address) return initENode(node.keys.pubKey, node.address)
@ -115,7 +111,7 @@ proc connectToNetwork*(node: EthereumNode,
while node.peerPool.connectedNodes.len == 0: while node.peerPool.connectedNodes.len == 0:
trace "Waiting for more peers", peers = node.peerPool.connectedNodes.len trace "Waiting for more peers", peers = node.peerPool.connectedNodes.len
await sleepAsync(500) await sleepAsync(500.milliseconds)
proc stopListening*(node: EthereumNode) = proc stopListening*(node: EthereumNode) =
node.listeningServer.stop() node.listeningServer.stop()

View File

@ -8,7 +8,7 @@ import
const const
lookupInterval = 5 lookupInterval = 5
connectLoopSleepMs = 2000 connectLoopSleep = chronos.milliseconds(2000)
proc newPeerPool*(network: EthereumNode, proc newPeerPool*(network: EthereumNode,
networkId: uint, keyPair: KeyPair, networkId: uint, keyPair: KeyPair,
@ -107,24 +107,19 @@ proc getRandomBootnode(p: PeerPool): Option[Node] =
if p.discovery.bootstrapNodes.len != 0: if p.discovery.bootstrapNodes.len != 0:
result = option(p.discovery.bootstrapNodes.rand()) result = option(p.discovery.bootstrapNodes.rand())
proc addPeer*(pool: PeerPool, peer: Peer): bool = proc addPeer*(pool: PeerPool, peer: Peer) =
if peer.remote notin pool.connectedNodes: doAssert(peer.remote notin pool.connectedNodes)
pool.connectedNodes[peer.remote] = peer pool.connectedNodes[peer.remote] = peer
for o in pool.observers.values: for o in pool.observers.values:
if not o.onPeerConnected.isNil: if not o.onPeerConnected.isNil:
if o.protocol.isNil or peer.supports(o.protocol): if o.protocol.isNil or peer.supports(o.protocol):
o.onPeerConnected(peer) o.onPeerConnected(peer)
return true
else: return false
proc connectToNode*(p: PeerPool, n: Node) {.async.} = proc connectToNode*(p: PeerPool, n: Node) {.async.} =
let peer = await p.connect(n) let peer = await p.connect(n)
if not peer.isNil: if not peer.isNil:
trace "Connection established", peer trace "Connection established (outgoing)", peer
if not p.addPeer(peer): p.addPeer(peer)
# In case an incoming connection was added in the meanwhile
trace "Disconnecting peer (outgoing)", reason = AlreadyConnected
await peer.disconnect(AlreadyConnected)
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} = proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
for node in nodes: for node in nodes:
@ -184,7 +179,7 @@ proc run(p: PeerPool) {.async.} =
if dropConnections: if dropConnections:
await p.stopAllPeers() await p.stopAllPeers()
await sleepAsync(connectLoopSleepMs) await sleepAsync(connectLoopSleep)
proc start*(p: PeerPool) = proc start*(p: PeerPool) =
if not p.running: if not p.running:

View File

@ -1393,8 +1393,10 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
await postHelloSteps(result, response) await postHelloSteps(result, response)
ok = true ok = true
except PeerDisconnected as e: except PeerDisconnected as e:
if e.reason != TooManyPeers: if e.reason == AlreadyConnected or e.reason == TooManyPeers:
debug "Unexpected disconnect during rlpxConnect", reason = e.reason trace "Disconnect during rlpxAccept", reason = e.reason
else:
debug "Unexpected disconnect during rlpxAccept", reason = e.reason
except TransportIncompleteError: except TransportIncompleteError:
trace "Connection dropped in rlpxConnect", remote trace "Connection dropped in rlpxConnect", remote
except UselessPeerError: except UselessPeerError:
@ -1405,12 +1407,10 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
debug "Rlp error in rlpxConnect" debug "Rlp error in rlpxConnect"
except TransportOsError: except TransportOsError:
trace "TransportOsError", err = getCurrentExceptionMsg() trace "TransportOsError", err = getCurrentExceptionMsg()
except: except CatchableError:
error "Exception in rlpxConnect", remote, error "Unexpected exception in rlpxConnect", remote,
exc = getCurrentException().name, exc = getCurrentException().name,
err = getCurrentExceptionMsg() err = getCurrentExceptionMsg()
result = nil
raise
if not ok: if not ok:
if not isNil(result.transport): if not isNil(result.transport):
@ -1480,10 +1480,20 @@ proc rlpxAccept*(node: EthereumNode,
udpPort: remote.port) udpPort: remote.port)
result.remote = newNode(initEnode(handshake.remoteHPubkey, address)) result.remote = newNode(initEnode(handshake.remoteHPubkey, address))
# In case there is an outgoing connection started with this peer we give
# precedence to that one and we disconnect here with `AlreadyConnected`
if result.remote in node.peerPool.connectedNodes or
result.remote in node.peerPool.connectingNodes:
trace "Duplicate connection in rlpxAccept"
raisePeerDisconnected("Peer already connecting or connected",
AlreadyConnected)
node.peerPool.connectingNodes.incl(result.remote)
await postHelloSteps(result, response) await postHelloSteps(result, response)
ok = true ok = true
except PeerDisconnected as e: except PeerDisconnected as e:
if e.reason == AlreadyConnected: if e.reason == AlreadyConnected or e.reason == TooManyPeers:
trace "Disconnect during rlpxAccept", reason = e.reason trace "Disconnect during rlpxAccept", reason = e.reason
else: else:
debug "Unexpected disconnect during rlpxAccept", reason = e.reason debug "Unexpected disconnect during rlpxAccept", reason = e.reason
@ -1491,16 +1501,21 @@ proc rlpxAccept*(node: EthereumNode,
trace "Connection dropped in rlpxAccept", remote = result.remote trace "Connection dropped in rlpxAccept", remote = result.remote
except UselessPeerError: except UselessPeerError:
trace "Disconnecting useless peer", peer = result.remote trace "Disconnecting useless peer", peer = result.remote
except: except RlpTypeMismatch:
error "Exception in rlpxAccept", # Some peers report capabilities with names longer than 3 chars. We ignore
err = getCurrentExceptionMsg(), # those for now. Maybe we should allow this though.
stackTrace = getCurrentException().getStackTrace() debug "Rlp error in rlpxAccept"
except TransportOsError:
trace "TransportOsError", err = getCurrentExceptionMsg()
except CatchableError:
error "Unexpected exception in rlpxAccept",
exc = getCurrentException().name,
err = getCurrentExceptionMsg()
if not ok: if not ok:
if not isNil(result.transport): if not isNil(result.transport):
result.transport.close() result.transport.close()
result = nil result = nil
raise
when isMainModule: when isMainModule:

View File

@ -29,8 +29,8 @@ const
whisperVersionStr* = $whisperVersion whisperVersionStr* = $whisperVersion
defaultMinPow* = 0.2'f64 defaultMinPow* = 0.2'f64
defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size defaultMaxMsgSize* = 1024'u32 * 1024'u32 # * 10 # should be no higher than max RLPx size
messageInterval* = 300 ## Interval at which messages are send to peers, in ms messageInterval* = chronos.milliseconds(300) ## Interval at which messages are send to peers, in ms
pruneInterval* = 1000 ## Interval at which message queue is pruned, in ms pruneInterval* = chronos.milliseconds(1000) ## Interval at which message queue is pruned, in ms
type type
Hash* = MDigest[256] Hash* = MDigest[256]

View File

@ -113,8 +113,8 @@ if config.main:
else: else:
netId = 15 netId = 15
let keys = newKeyPair() let keypair = newKeyPair()
var node = newEthereumNode(keys, address, netId, nil, addAllCapabilities = false) var node = newEthereumNode(keypair, address, netId, nil, addAllCapabilities = false)
node.addCapability Whisper node.addCapability Whisper
# lets prepare some prearranged keypairs # lets prepare some prearranged keypairs

View File

@ -18,7 +18,7 @@ proc resetMessageQueues(nodes: varargs[EthereumNode]) =
let bootENode = waitFor setupBootNode() let bootENode = waitFor setupBootNode()
let safeTTL = 5'u32 let safeTTL = 5'u32
let waitInterval = messageInterval + 150 let waitInterval = messageInterval + 150.milliseconds
var node1 = setupTestNode(Whisper) var node1 = setupTestNode(Whisper)
var node2 = setupTestNode(Whisper) var node2 = setupTestNode(Whisper)
@ -283,7 +283,7 @@ suite "Whisper connections":
await sleepAsync(waitInterval) await sleepAsync(waitInterval)
check node1.protocolState(Whisper).queue.items.len == 10 check node1.protocolState(Whisper).queue.items.len == 10
await sleepAsync(int((lowerTTL+1)*1000)) await sleepAsync(milliseconds((lowerTTL+1)*1000))
check node1.protocolState(Whisper).queue.items.len == 0 check node1.protocolState(Whisper).queue.items.len == 0
check node2.protocolState(Whisper).queue.items.len == 0 check node2.protocolState(Whisper).queue.items.len == 0