mirror of https://github.com/status-im/nim-eth.git
Support obsolete chunked RLPx wire protocol
why: For some reason, Nethermind insists on sending chunked messages to the syncing peer. Unfortunately, for the test networks the Nethermind modes are the importent ones as they speak eth/65 as well while others like Geth only support eth/66 which is not implemented here, yet.
This commit is contained in:
parent
0feefab9ce
commit
2ff455f26d
115
eth/p2p/rlpx.nim
115
eth/p2p/rlpx.nim
|
@ -453,6 +453,43 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
|
||||||
|
|
||||||
debug "late or duplicate reply for a RLPx request"
|
debug "late or duplicate reply for a RLPx request"
|
||||||
|
|
||||||
|
proc getRlpxHeaderData(header: RlpxHeader): (int,int,int) =
|
||||||
|
## Helper for `recvMsg()`
|
||||||
|
# This is insane. Some clients like Nethermid use the now obsoleted
|
||||||
|
# chunked frame protocol, see
|
||||||
|
# github.com/ethereum/devp2p/commit/6504d410bc4b8dda2b43941e1cb48c804b90cf22.
|
||||||
|
result = (-1, -1, 0)
|
||||||
|
proc datagramSize: int =
|
||||||
|
# For logging only
|
||||||
|
(header[0].int shl 16) or (header[1].int shl 8) or header[1].int
|
||||||
|
try:
|
||||||
|
let optsLen = max(0, header[3].int - 0xc0)
|
||||||
|
var hdrData = header[4 ..< 4 + optsLen].rlpFromBytes
|
||||||
|
result[0] = hdrData.read(int) # capability ID
|
||||||
|
result[1] = hdrData.read(int) # context ID
|
||||||
|
if hdrData.isBlob:
|
||||||
|
result[2] = hdrData.read(int) # total packet size
|
||||||
|
trace "RLPx message first chunked header-data",
|
||||||
|
capabilityId = result[0],
|
||||||
|
contextId = result[1],
|
||||||
|
totalPacketSize = result[2],
|
||||||
|
datagramSize = datagramSize()
|
||||||
|
#[
|
||||||
|
elif 0 < result[1]:
|
||||||
|
# This should be all zero according to latest specs
|
||||||
|
trace "RLPx message chunked next header-data",
|
||||||
|
capabilityId = result[0],
|
||||||
|
contextId = result[1],
|
||||||
|
datagramSize = datagramSize()
|
||||||
|
#]#
|
||||||
|
except:
|
||||||
|
error "RLPx message header-data options, parse error",
|
||||||
|
capabilityId = result[0],
|
||||||
|
contextId = result[1],
|
||||||
|
totalPacketSize = result[2],
|
||||||
|
datagramSize = datagramSize()
|
||||||
|
result = (-1, -1, -1)
|
||||||
|
|
||||||
proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
|
proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
|
||||||
## This procs awaits the next complete RLPx message in the TCP stream
|
## This procs awaits the next complete RLPx message in the TCP stream
|
||||||
|
|
||||||
|
@ -460,8 +497,9 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
|
||||||
await peer.transport.readExactly(addr headerBytes[0], 32)
|
await peer.transport.readExactly(addr headerBytes[0], 32)
|
||||||
|
|
||||||
var msgSize: int
|
var msgSize: int
|
||||||
|
var msgHeader: RlpxHeader
|
||||||
if decryptHeaderAndGetMsgSize(peer.secretsState,
|
if decryptHeaderAndGetMsgSize(peer.secretsState,
|
||||||
headerBytes, msgSize).isErr():
|
headerBytes, msgSize, msgHeader).isErr():
|
||||||
await peer.disconnectAndRaise(BreachOfProtocol,
|
await peer.disconnectAndRaise(BreachOfProtocol,
|
||||||
"Cannot decrypt RLPx frame header")
|
"Cannot decrypt RLPx frame header")
|
||||||
|
|
||||||
|
@ -499,14 +537,85 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
|
||||||
"Snappy uncompress encountered malformed data")
|
"Snappy uncompress encountered malformed data")
|
||||||
var rlp = rlpFromBytes(decryptedBytes)
|
var rlp = rlpFromBytes(decryptedBytes)
|
||||||
|
|
||||||
|
var msgId: int32
|
||||||
try:
|
try:
|
||||||
# int32 as this seems more than big enough for the amount of msgIds
|
# int32 as this seems more than big enough for the amount of msgIds
|
||||||
let msgId = rlp.read(int32)
|
msgId = rlp.read(int32)
|
||||||
return (msgId.int, rlp)
|
result = (msgId.int, rlp)
|
||||||
except RlpError:
|
except RlpError:
|
||||||
await peer.disconnectAndRaise(BreachOfProtocol,
|
await peer.disconnectAndRaise(BreachOfProtocol,
|
||||||
"Cannot read RLPx message id")
|
"Cannot read RLPx message id")
|
||||||
|
|
||||||
|
# Snappy with obsolete chunked RLPx message datagrams is unsupported here
|
||||||
|
when useSnappy:
|
||||||
|
if peer.snappyEnabled:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check embedded header-data for start of a chunked message
|
||||||
|
let (capaId, ctxId, totalMsgSize) = msgHeader.getRlpxHeaderData
|
||||||
|
|
||||||
|
# This also covers totalMessageSize <= 0
|
||||||
|
if totalMsgSize <= msgSize:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Loop over chunked RLPx datagram fragments
|
||||||
|
var moreData = totalMsgSize - msgSize
|
||||||
|
while 0 < moreData:
|
||||||
|
|
||||||
|
# Load and parse next header
|
||||||
|
block:
|
||||||
|
await peer.transport.readExactly(addr headerBytes[0], 32)
|
||||||
|
if decryptHeaderAndGetMsgSize(peer.secretsState,
|
||||||
|
headerBytes, msgSize, msgHeader).isErr():
|
||||||
|
trace "RLPx next chunked header-data failed",
|
||||||
|
peer, msgId, ctxId, maxSize = moreData
|
||||||
|
await peer.disconnectAndRaise(BreachOfProtocol,
|
||||||
|
"Cannot decrypt next chunked RLPx header")
|
||||||
|
|
||||||
|
# Verify that this is really the next chunk
|
||||||
|
block:
|
||||||
|
let (_, ctyId, totalSize) = msgHeader.getRlpxHeaderData
|
||||||
|
if ctyId != ctxId or 0 < totalSize:
|
||||||
|
trace "Malformed RLPx next chunked header-data",
|
||||||
|
peer, msgId, msgSize, ctxtId = ctyId, expCtxId = ctxId, totalSize
|
||||||
|
await peer.disconnectAndRaise(BreachOfProtocol,
|
||||||
|
"Malformed next chunked RLPx header")
|
||||||
|
|
||||||
|
# Append payload to `decryptedBytes` collector
|
||||||
|
block:
|
||||||
|
var encBytes = newSeq[byte](msgSize.encryptedLength - 32)
|
||||||
|
await peer.transport.readExactly(addr encBytes[0], encBytes.len)
|
||||||
|
var
|
||||||
|
dcrBytes = newSeq[byte](msgSize.decryptedLength)
|
||||||
|
dcrBytesCount = 0
|
||||||
|
# TODO: This should be improved by passing a reference into
|
||||||
|
# `decryptedBytes` where to append the data.
|
||||||
|
if decryptBody(peer.secretsState, encBytes, msgSize,
|
||||||
|
dcrBytes, dcrBytesCount).isErr():
|
||||||
|
await peer.disconnectAndRaise(BreachOfProtocol,
|
||||||
|
"Cannot decrypt next chunked RLPx frame body")
|
||||||
|
decryptedBytes.add dcrBytes[0 ..< dcrBytesCount]
|
||||||
|
moreData -= msgSize
|
||||||
|
#[
|
||||||
|
trace "RLPx next chunked datagram fragment",
|
||||||
|
peer, msgId = result[0], ctxId, msgSize, moreData, totalMsgSize,
|
||||||
|
dcrBytesCount, payloadSoFar = decryptedBytes.len
|
||||||
|
#]#
|
||||||
|
|
||||||
|
# End While
|
||||||
|
|
||||||
|
if moreData != 0:
|
||||||
|
await peer.disconnectAndRaise(BreachOfProtocol,
|
||||||
|
"Malformed assembly of chunked RLPx message")
|
||||||
|
|
||||||
|
# Pass back extended message (first entry remains `msgId`)
|
||||||
|
result[1] = decryptedBytes.rlpFromBytes
|
||||||
|
result[1].position = rlp.position
|
||||||
|
|
||||||
|
trace "RLPx chunked datagram payload",
|
||||||
|
peer, msgId, ctxId, totalMsgSize, moreData, payload = decryptedBytes.len
|
||||||
|
|
||||||
|
|
||||||
proc checkedRlpRead(peer: Peer, r: var Rlp, MsgType: type):
|
proc checkedRlpRead(peer: Peer, r: var Rlp, MsgType: type):
|
||||||
auto {.raises: [RlpError, Defect].} =
|
auto {.raises: [RlpError, Defect].} =
|
||||||
when defined(release):
|
when defined(release):
|
||||||
|
|
|
@ -200,6 +200,14 @@ proc decryptHeader*(c: var SecretState, data: openArray[byte],
|
||||||
c.aesdec.decrypt(toa(data, 0, RlpHeaderLength), output)
|
c.aesdec.decrypt(toa(data, 0, RlpHeaderLength), output)
|
||||||
result = ok()
|
result = ok()
|
||||||
|
|
||||||
|
proc decryptHeaderAndGetMsgSize*(c: var SecretState,
|
||||||
|
encryptedHeader: openArray[byte],
|
||||||
|
outSize: var int,
|
||||||
|
outHeader: var RlpxHeader): RlpxResult[void] =
|
||||||
|
result = decryptHeader(c, encryptedHeader, outHeader)
|
||||||
|
if result.isOk():
|
||||||
|
outSize = outHeader.getBodySize
|
||||||
|
|
||||||
proc decryptHeaderAndGetMsgSize*(c: var SecretState,
|
proc decryptHeaderAndGetMsgSize*(c: var SecretState,
|
||||||
encryptedHeader: openArray[byte],
|
encryptedHeader: openArray[byte],
|
||||||
outSize: var int): RlpxResult[void] =
|
outSize: var int): RlpxResult[void] =
|
||||||
|
|
Loading…
Reference in New Issue