nim-eth/eth/p2p/rlpxtransport.nim

244 lines
8.0 KiB
Nim

# nim-eth
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.
{.push raises: [], gcsafe.}
import results, chronos, ../common/keys, ./[auth, rlpxcrypt]
export results, keys
type
RlpxTransport* = ref object
stream: StreamTransport
state: SecretState
pubkey*: PublicKey
RlpxTransportError* = object of CatchableError
template `^`(arr): auto =
# passes a stack array with a matching `arrLen` variable as an open array
arr.toOpenArray(0, `arr Len` - 1)
proc initiatorHandshake(
rng: ref HmacDrbgContext,
keys: KeyPair,
stream: StreamTransport,
remotePubkey: PublicKey,
): Future[ConnectionSecret] {.
async: (raises: [CancelledError, TransportError, RlpxTransportError])
.} =
# https://github.com/ethereum/devp2p/blob/5713591d0366da78a913a811c7502d9ca91d29a8/rlpx.md#initial-handshake
var
handshake = Handshake.init(rng[], keys, {Initiator})
authMsg: array[AuthMessageMaxEIP8, byte]
let
authMsgLen = handshake.authMessage(rng[], remotePubkey, authMsg).expect(
"No errors with correctly sized buffer"
)
writeRes = await stream.write(addr authMsg[0], authMsgLen)
if writeRes != authMsgLen:
# TOOD raising a chronos error here is a hack - rework using something else
raise (ref TransportIncompleteError)(msg: "Could not write RLPx handshake header")
var ackMsg = newSeqOfCap[byte](1024)
ackMsg.setLen(MsgLenLenEIP8)
await stream.readExactly(addr ackMsg[0], len(ackMsg))
let ackMsgLen = handshake.decodeAckMsgLen(ackMsg).valueOr:
raise
(ref RlpxTransportError)(msg: "Could not decode handshake ack length: " & $error)
ackMsg.setLen(ackMsgLen)
await stream.readExactly(addr ackMsg[MsgLenLenEIP8], ackMsgLen - MsgLenLenEIP8)
handshake.decodeAckMessage(ackMsg).isOkOr:
raise (ref RlpxTransportError)(msg: "Could not decode handshake ack: " & $error)
handshake.getSecrets(^authMsg, ackMsg)
proc responderHandshake(
rng: ref HmacDrbgContext, keys: KeyPair, stream: StreamTransport
): Future[(ConnectionSecret, PublicKey)] {.
async: (raises: [CancelledError, TransportError, RlpxTransportError])
.} =
# https://github.com/ethereum/devp2p/blob/5713591d0366da78a913a811c7502d9ca91d29a8/rlpx.md#initial-handshake
var
handshake = Handshake.init(rng[], keys, {auth.Responder})
authMsg = newSeqOfCap[byte](1024)
authMsg.setLen(MsgLenLenEIP8)
await stream.readExactly(addr authMsg[0], len(authMsg))
let authMsgLen = handshake.decodeAuthMsgLen(authMsg).valueOr:
raise
(ref RlpxTransportError)(msg: "Could not decode handshake auth length: " & $error)
authMsg.setLen(authMsgLen)
await stream.readExactly(addr authMsg[MsgLenLenEIP8], authMsgLen - MsgLenLenEIP8)
handshake.decodeAuthMessage(authMsg).isOkOr:
raise (ref RlpxTransportError)(
msg: "Could not decode handshake auth message: " & $error
)
var ackMsg: array[AckMessageMaxEIP8, byte]
let ackMsgLen =
handshake.ackMessage(rng[], ackMsg).expect("no errors with correcly sized buffer")
var res = await stream.write(addr ackMsg[0], ackMsgLen)
if res != ackMsgLen:
# TOOD raising a chronos error here is a hack - rework using something else
raise (ref TransportIncompleteError)(msg: "Could not write RLPx ack message")
(handshake.getSecrets(authMsg, ^ackMsg), handshake.remoteHPubkey)
proc connect*(
_: type RlpxTransport,
rng: ref HmacDrbgContext,
keys: KeyPair,
address: TransportAddress,
remotePubkey: PublicKey,
): Future[RlpxTransport] {.
async: (raises: [CancelledError, TransportError, RlpxTransportError])
.} =
var stream = await connect(address)
try:
let secrets = await initiatorHandshake(rng, keys, stream, remotePubkey)
var res = RlpxTransport(stream: move(stream), pubkey: remotePubkey)
initSecretState(secrets, res.state)
res
finally:
if stream != nil:
stream.close()
proc accept*(
_: type RlpxTransport,
rng: ref HmacDrbgContext,
keys: KeyPair,
stream: StreamTransport,
): Future[RlpxTransport] {.
async: (raises: [CancelledError, TransportError, RlpxTransportError])
.} =
var stream = stream
try:
let (secrets, remotePubkey) = await responderHandshake(rng, keys, stream)
var res = RlpxTransport(stream: move(stream), pubkey: remotePubkey)
initSecretState(secrets, res.state)
res
finally:
if stream != nil:
stream.close()
proc recvMsg*(
transport: RlpxTransport
): Future[seq[byte]] {.
async: (raises: [CancelledError, TransportError, RlpxTransportError])
.} =
## Read an RLPx frame from the given peer
var msgHeaderEnc: RlpxEncryptedHeader
await transport.stream.readExactly(addr msgHeaderEnc[0], msgHeaderEnc.len)
let msgHeader = decryptHeader(transport.state, msgHeaderEnc).valueOr:
raise (ref RlpxTransportError)(msg: "Cannot decrypt RLPx frame header")
# The header has more fields than the size, but they are unused / obsolete.
# Although some clients set them, we don't check this in the spirit of EIP-8
# https://github.com/ethereum/devp2p/blob/5713591d0366da78a913a811c7502d9ca91d29a8/rlpx.md#framing
let msgSize = msgHeader.getBodySize()
let remainingBytes = encryptedLength(msgSize) - 32
var encryptedBytes = newSeq[byte](remainingBytes)
await transport.stream.readExactly(addr encryptedBytes[0], len(encryptedBytes))
let decryptedMaxLength = decryptedLength(msgSize) # Padded length
var msgBody = newSeq[byte](decryptedMaxLength)
if decryptBody(transport.state, encryptedBytes, msgSize, msgBody).isErr():
raise (ref RlpxTransportError)(msg: "Cannot decrypt message body")
reset(encryptedBytes) # Release memory (TODO: in-place decryption)
msgBody.setLen(msgSize) # Remove padding
msgBody
proc sendMsg*(
transport: RlpxTransport, data: seq[byte]
) {.async: (raises: [CancelledError, TransportError, RlpxTransportError]).} =
let cipherText = encryptMsg(data, transport.state)
var res = await transport.stream.write(cipherText)
if res != len(cipherText):
# TOOD raising a chronos error here is a hack - rework using something else
raise (ref TransportIncompleteError)(msg: "Could not complete writing message")
proc remoteAddress*(
transport: RlpxTransport
): TransportAddress {.raises: [TransportOsError].} =
transport.stream.remoteAddress()
proc closed*(transport: RlpxTransport): bool =
transport.stream != nil and transport.stream.closed
proc close*(transport: RlpxTransport) =
if transport.stream != nil:
transport.stream.close()
proc closeWait*(
transport: RlpxTransport
): Future[void] {.async: (raises: [], raw: true).} =
transport.stream.closeWait()
when isMainModule:
# Simple CLI application for negotiating an RLPx connection with a peer
import stew/byteutils, std/cmdline, std/strutils, eth/rlp
if paramCount() < 3:
echo "rlpxtransport ip port pubkey"
quit 1
let
rng = newRng()
kp = KeyPair.random(rng[])
echo "Local key: ", toHex(kp.pubkey.toRaw())
let client = waitFor RlpxTransport.connect(
rng,
kp,
initTAddress(paramStr(1), parseInt(paramStr(2))),
PublicKey.fromHex(paramStr(3))[],
)
proc encodeMsg(msgId: uint64, msg: auto): seq[byte] =
var rlpWriter = initRlpWriter()
rlpWriter.append msgId
rlpWriter.appendRecordType(msg, typeof(msg).rlpFieldsCount > 1)
rlpWriter.finish
waitFor client.sendMsg(
encodeMsg(
uint64 0, (uint64 4, "nimbus", @[("eth", uint64 68)], uint64 0, kp.pubkey.toRaw())
)
)
while true:
echo "Reading message"
var data = waitFor client.recvMsg()
var rlp = rlpFromBytes(data)
let msgId = rlp.read(uint64)
if msgId == 0:
echo "Hello: ",
rlp.read((uint64, string, seq[(string, uint64)], uint64, seq[byte]))
else:
echo "Unknown message ", msgId, " ", toHex(data)