diff --git a/eth/p2p/auth.nim b/eth/p2p/auth.nim index a76b3b1..0360c35 100644 --- a/eth/p2p/auth.nim +++ b/eth/p2p/auth.nim @@ -95,12 +95,11 @@ proc `xor`[N: static int](a, b: array[N, byte]): array[N, byte] = proc mapErrTo[T, E](r: Result[T, E], v: static AuthError): AuthResult[T] = r.mapErr(proc (e: E): AuthError = v) -proc tryInit*( +proc init*( T: type Handshake, rng: var BrHmacDrbgContext, host: KeyPair, flags: set[HandshakeFlag] = {Initiator}, - version: uint8 = SupportedRlpxVersion): AuthResult[T] = + version: uint8 = SupportedRlpxVersion): T = ## Create new `Handshake` object. - var initiatorNonce: Nonce responderNonce: Nonce @@ -114,7 +113,7 @@ proc tryInit*( expectedLength = AuthMessageV4Length brHmacDrbgGenerate(rng, responderNonce) - return ok(T( + return T( version: version, flags: flags, host: host, @@ -122,7 +121,7 @@ proc tryInit*( initiatorNonce: initiatorNonce, responderNonce: responderNonce, expectedLength: expectedLength - )) + ) proc authMessagePreEIP8(h: var Handshake, rng: var BrHmacDrbgContext, diff --git a/eth/p2p/discovery.nim b/eth/p2p/discovery.nim index 52f3607..eddb43e 100644 --- a/eth/p2p/discovery.nim +++ b/eth/p2p/discovery.nim @@ -44,12 +44,14 @@ type cmdPong = 2 cmdFindNode = 3 cmdNeighbours = 4 + cmdENRRequest = 5 + cmdENRResponse = 6 DiscProtocolError* = object of CatchableError DiscResult*[T] = Result[T, cstring] -const MinListLen: array[CommandId, int] = [4, 3, 2, 2] +const MinListLen: array[CommandId, int] = [4, 3, 2, 2, 1, 2] proc append*(w: var RlpWriter, a: IpAddress) = case a.family @@ -267,6 +269,9 @@ proc receive*(d: DiscoveryProtocol, a: Address, msg: openArray[byte]) d.recvNeighbours(node, payload) of cmdFindNode: d.recvFindNode(node, payload) + of cmdENRRequest, cmdENRResponse: + # TODO: Implement EIP-868 + discard else: trace "Received msg already expired", cmdId, a else: diff --git a/eth/p2p/rlpx.nim b/eth/p2p/rlpx.nim index d536d74..8f694ef 100644 --- a/eth/p2p/rlpx.nim +++ b/eth/p2p/rlpx.nim @@ -1212,22 +1212,19 @@ template baseProtocolVersion(peer: Peer): uint = proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = initTracing(devp2pInfo, node.protocols) - new result - result.network = node - result.remote = remote - + let peer = Peer(remote: remote, network: node) let ta = initTAddress(remote.node.address.ip, remote.node.address.tcpPort) var ok = false try: - result.transport = await connect(ta) - var handshake = Handshake.tryInit( - node.rng[], node.keys, {Initiator, EIP8}, node.baseProtocolVersion).tryGet() + peer.transport = await connect(ta) + var handshake = Handshake.init( + node.rng[], node.keys, {Initiator, EIP8}, node.baseProtocolVersion) var authMsg: array[AuthMessageMaxEIP8, byte] var authMsgLen = 0 authMessage( handshake, node.rng[], remote.node.pubkey, authMsg, authMsgLen).tryGet() - var res = await result.transport.write(addr authMsg[0], authMsgLen) + var res = await peer.transport.write(addr authMsg[0], authMsgLen) if res != authMsgLen: raisePeerDisconnected("Unexpected disconnect while authenticating", TcpError) @@ -1236,33 +1233,41 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = var ackMsg = newSeqOfCap[byte](1024) ackMsg.setLen(initialSize) - await result.transport.readExactly(addr ackMsg[0], len(ackMsg)) + # TODO: Should we not set some timeouts on these `readExactly`s? + await peer.transport.readExactly(addr ackMsg[0], len(ackMsg)) var ret = handshake.decodeAckMessage(ackMsg) if ret.isErr and ret.error == AuthError.IncompleteError: ackMsg.setLen(handshake.expectedLength) - await result.transport.readExactly(addr ackMsg[initialSize], + await peer.transport.readExactly(addr ackMsg[initialSize], len(ackMsg) - initialSize) ret = handshake.decodeAckMessage(ackMsg) - ret.tryGet() # for the raise! - node.checkSnappySupport(handshake, result) - initSecretState(handshake, ^authMsg, ackMsg, result) + if ret.isErr(): + debug "rlpxConnect handshake error", error = ret.error + if not isNil(peer.transport): + peer.transport.close() + return nil + + ret.get() + + node.checkSnappySupport(handshake, peer) + initSecretState(handshake, ^authMsg, ackMsg, peer) # if handshake.remoteHPubkey != remote.node.pubKey: # raise newException(Exception, "Remote pubkey is wrong") - logConnectedPeer result + logConnectedPeer peer - var sendHelloFut = result.hello( + var sendHelloFut = peer.hello( handshake.getVersion(), node.clientId, node.capabilities, uint(node.address.tcpPort), node.keys.pubkey.toRaw()) - var response = await result.handshakeImpl( + var response = await peer.handshakeImpl( sendHelloFut, - result.waitSingleMsg(DevP2P.hello), + peer.waitSingleMsg(DevP2P.hello), 10.seconds) if not validatePubKeyInHello(response, remote.node.pubkey): @@ -1271,7 +1276,7 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = trace "DevP2P handshake completed", peer = remote, clientId = response.clientId - await postHelloSteps(result, response) + await postHelloSteps(peer, response) ok = true trace "Peer fully connected", peer = remote, clientId = response.clientId except PeerDisconnected as e: @@ -1296,39 +1301,47 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = err = e.msg if not ok: - if not isNil(result.transport): - result.transport.close() - result = nil + if not isNil(peer.transport): + peer.transport.close() + return nil + else: + return peer proc rlpxAccept*(node: EthereumNode, transport: StreamTransport): Future[Peer] {.async.} = initTracing(devp2pInfo, node.protocols) - new result - result.transport = transport - result.network = node - - var handshake = - Handshake.tryInit(node.rng[], node.keys, {auth.Responder}).tryGet - + let peer = Peer(transport: transport, network: node) + var handshake = Handshake.init(node.rng[], node.keys, {auth.Responder}) var ok = false try: let initialSize = handshake.expectedLength var authMsg = newSeqOfCap[byte](1024) authMsg.setLen(initialSize) + # TODO: Should we not set some timeouts on these `readExactly`s? await transport.readExactly(addr authMsg[0], len(authMsg)) var ret = handshake.decodeAuthMessage(authMsg) if ret.isErr and ret.error == AuthError.IncompleteError: - # Eip8 auth message is likely + # Eip8 auth message is possible, but not likely authMsg.setLen(handshake.expectedLength) await transport.readExactly(addr authMsg[initialSize], len(authMsg) - initialSize) ret = handshake.decodeAuthMessage(authMsg) - ret.tryGet() # for the raise! - node.checkSnappySupport(handshake, result) - handshake.version = uint8(result.baseProtocolVersion) + if ret.isErr(): + # It is likely that errors on the handshake Auth is just garbage arriving + # on the TCP port as it is the first data on the incoming connection, + # hence log them as trace. + trace "rlpxAccept handshake error", error = ret.error + if not isNil(peer.transport): + peer.transport.close() + return nil + + ret.get() + + node.checkSnappySupport(handshake, peer) + handshake.version = uint8(peer.baseProtocolVersion) var ackMsg: array[AckMessageMaxEIP8, byte] var ackMsgLen: int @@ -1338,22 +1351,22 @@ proc rlpxAccept*(node: EthereumNode, raisePeerDisconnected("Unexpected disconnect while authenticating", TcpError) - initSecretState(handshake, authMsg, ^ackMsg, result) + initSecretState(handshake, authMsg, ^ackMsg, peer) let listenPort = transport.localAddress().port - logAcceptedPeer result + logAcceptedPeer peer - var sendHelloFut = result.hello( - result.baseProtocolVersion, + var sendHelloFut = peer.hello( + peer.baseProtocolVersion, node.clientId, node.capabilities, listenPort.uint, node.keys.pubkey.toRaw()) - var response = await result.handshakeImpl( + var response = await peer.handshakeImpl( sendHelloFut, - result.waitSingleMsg(DevP2P.hello), + peer.waitSingleMsg(DevP2P.hello), 10.seconds) trace "Received Hello", version=response.version, id=response.clientId @@ -1364,36 +1377,36 @@ proc rlpxAccept*(node: EthereumNode, let remote = transport.remoteAddress() let address = Address(ip: remote.address, tcpPort: remote.port, udpPort: remote.port) - result.remote = newNode( + peer.remote = newNode( ENode(pubkey: handshake.remoteHPubkey, address: address)) - trace "devp2p handshake completed", peer = result.remote, + trace "devp2p handshake completed", peer = peer.remote, clientId = response.clientId # In case there is an outgoing connection started with this peer we give # precedence to that one and we disconnect here with `AlreadyConnected` - if result.remote in node.peerPool.connectedNodes or - result.remote in node.peerPool.connectingNodes: + if peer.remote in node.peerPool.connectedNodes or + peer.remote in node.peerPool.connectingNodes: trace "Duplicate connection in rlpxAccept" raisePeerDisconnected("Peer already connecting or connected", AlreadyConnected) - node.peerPool.connectingNodes.incl(result.remote) + node.peerPool.connectingNodes.incl(peer.remote) - await postHelloSteps(result, response) + await postHelloSteps(peer, response) ok = true - trace "Peer fully connected", peer = result.remote, clientId = response.clientId + trace "Peer fully connected", peer = peer.remote, clientId = response.clientId except PeerDisconnected as e: case e.reason of AlreadyConnected, TooManyPeers, MessageTimeout: - trace "Disconnect during rlpxAccept", reason = e.reason, peer = result.remote + trace "Disconnect during rlpxAccept", reason = e.reason, peer = peer.remote else: debug "Unexpected disconnect during rlpxAccept", reason = e.reason, - msg = e.msg, peer = result.remote + msg = e.msg, peer = peer.remote except TransportIncompleteError: - trace "Connection dropped in rlpxAccept", remote = result.remote + trace "Connection dropped in rlpxAccept", remote = peer.remote except UselessPeerError: - trace "Disconnecting useless peer", peer = result.remote + trace "Disconnecting useless peer", peer = peer.remote except RlpTypeMismatch: # Some peers report capabilities with names longer than 3 chars. We ignore # those for now. Maybe we should allow this though. @@ -1404,9 +1417,11 @@ proc rlpxAccept*(node: EthereumNode, error "Unexpected exception in rlpxAccept", exc = e.name, err = e.msg if not ok: - if not isNil(result.transport): - result.transport.close() - result = nil + if not isNil(peer.transport): + peer.transport.close() + return nil + else: + return peer when isMainModule: diff --git a/tests/p2p/test_auth.nim b/tests/p2p/test_auth.nim index ccd9a29..cd80445 100644 --- a/tests/p2p/test_auth.nim +++ b/tests/p2p/test_auth.nim @@ -221,7 +221,7 @@ suite "Ethereum P2P handshake test suite": proc newTestHandshake(flags: set[HandshakeFlag]): Handshake = if Initiator in flags: let pk = PrivateKey.fromHex(testValue("initiator_private_key"))[] - result = Handshake.tryInit(rng[], pk.toKeyPair(), flags)[] + result = Handshake.init(rng[], pk.toKeyPair(), flags) let epki = testValue("initiator_ephemeral_private_key") result.ephemeral = PrivateKey.fromHex(epki)[].toKeyPair() @@ -229,7 +229,7 @@ suite "Ethereum P2P handshake test suite": result.initiatorNonce[0..^1] = nonce[0..^1] elif Responder in flags: let pk = PrivateKey.fromHex(testValue("receiver_private_key"))[] - result = Handshake.tryInit(rng[], pk.toKeyPair(), flags)[] + result = Handshake.init(rng[], pk.toKeyPair(), flags) let epkr = testValue("receiver_ephemeral_private_key") result.ephemeral = PrivateKey.fromHex(epkr)[].toKeyPair() let nonce = fromHex(stripSpaces(testValue("receiver_nonce"))) @@ -333,7 +333,7 @@ suite "Ethereum P2P handshake test suite": proc newTestHandshake(flags: set[HandshakeFlag]): Handshake = if Initiator in flags: let pk = PrivateKey.fromHex(testE8Value("initiator_private_key"))[] - result = Handshake.tryInit(rng[], pk.toKeyPair(), flags)[] + result = Handshake.init(rng[], pk.toKeyPair(), flags) let esec = testE8Value("initiator_ephemeral_private_key") result.ephemeral = PrivateKey.fromHex(esec)[].toKeyPair() @@ -341,7 +341,7 @@ suite "Ethereum P2P handshake test suite": result.initiatorNonce[0..^1] = nonce[0..^1] elif Responder in flags: let pk = PrivateKey.fromHex(testE8Value("receiver_private_key"))[] - result = Handshake.tryInit(rng[], pk.toKeyPair(), flags)[] + result = Handshake.init(rng[], pk.toKeyPair(), flags) let esec = testE8Value("receiver_ephemeral_private_key") result.ephemeral = PrivateKey.fromHex(esec)[].toKeyPair() diff --git a/tests/p2p/test_crypt.nim b/tests/p2p/test_crypt.nim index ab32c50..b3dc6fa 100644 --- a/tests/p2p/test_crypt.nim +++ b/tests/p2p/test_crypt.nim @@ -95,14 +95,14 @@ suite "Ethereum RLPx encryption/decryption test suite": proc newTestHandshake(flags: set[HandshakeFlag]): Handshake = if Initiator in flags: let pk = PrivateKey.fromHex(testValue("initiator_private_key"))[] - result = Handshake.tryInit(rng[], pk.toKeyPair(), flags)[] + result = Handshake.init(rng[], pk.toKeyPair(), flags) let epki = testValue("initiator_ephemeral_private_key") result.ephemeral = PrivateKey.fromHex(epki)[].toKeyPair() let nonce = fromHex(stripSpaces(testValue("initiator_nonce"))) result.initiatorNonce[0..^1] = nonce[0..^1] elif Responder in flags: let pk = PrivateKey.fromHex(testValue("receiver_private_key"))[] - result = Handshake.tryInit(rng[], pk.toKeyPair(), flags)[] + result = Handshake.init(rng[], pk.toKeyPair(), flags) let epkr = testValue("receiver_ephemeral_private_key") result.ephemeral = PrivateKey.fromHex(epkr)[].toKeyPair() let nonce = fromHex(stripSpaces(testValue("receiver_nonce")))