Merge pull request #484 from status-im/jordan/nethermind-sync

Jordan/nethermind sync
This commit is contained in:
Jordan Hrycaj 2022-03-28 09:34:51 +01:00 committed by GitHub
commit 4c7cdcaaf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 150 additions and 4 deletions

View File

@ -13,6 +13,17 @@ import
".."/[rlp, common, keys, async_utils],
./private/p2p_types, "."/[kademlia, auth, rlpxcrypt, enode, p2p_protocol_dsl]
const
# Insane kludge for suppporting chunked messages when syncing against clients
# like Nethermind.
#
# The original specs which are now obsoleted can be found here:
# github.com/ethereum/devp2p/commit/6504d410bc4b8dda2b43941e1cb48c804b90cf22.
#
# The current requirement is stated at
# github.com/ethereum/devp2p/blob/master/rlpx.md#framing
allowObsoletedChunkedMessages = defined(chunked_rlpx_enabled)
when useSnappy:
import snappy
const devp2pSnappyVersion* = 5
@ -233,7 +244,8 @@ proc getDispatcher(node: EthereumNode,
proc getMsgName*(peer: Peer, msgId: int): string =
if not peer.dispatcher.isNil and
msgId < peer.dispatcher.messages.len:
msgId < peer.dispatcher.messages.len and
not peer.dispatcher.messages[msgId].isNil:
return peer.dispatcher.messages[msgId].name
else:
return case msgId
@ -478,6 +490,43 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
debug "late or duplicate reply for a RLPx request"
proc getRlpxHeaderData(header: RlpxHeader): (int,int,int) =
## Helper for `recvMsg()`
# This is insane. Some clients like Nethermind use the now obsoleted
# chunked message frame protocol, see
# github.com/ethereum/devp2p/commit/6504d410bc4b8dda2b43941e1cb48c804b90cf22.
result = (-1, -1, 0)
proc datagramSize: int =
# For logging only
(header[0].int shl 16) or (header[1].int shl 8) or header[1].int
try:
let optsLen = max(0, header[3].int - 0xc0)
var hdrData = header[4 ..< 4 + optsLen].rlpFromBytes
result[0] = hdrData.read(int) # capability ID
result[1] = hdrData.read(int) # context ID
if hdrData.isBlob:
result[2] = hdrData.read(int) # total packet size
trace "RLPx message first chunked header-data",
capabilityId = result[0],
contextId = result[1],
totalPacketSize = result[2],
datagramSize = datagramSize()
#[
elif 0 < result[1]:
# This should be all zero according to latest specs
trace "RLPx message chunked next header-data",
capabilityId = result[0],
contextId = result[1],
datagramSize = datagramSize()
#]#
except:
error "RLPx message header-data options, parse error",
capabilityId = result[0],
contextId = result[1],
totalPacketSize = result[2],
datagramSize = datagramSize()
result = (-1, -1, -1)
proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
## This procs awaits the next complete RLPx message in the TCP stream
@ -485,8 +534,9 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
await peer.transport.readExactly(addr headerBytes[0], 32)
var msgSize: int
var msgHeader: RlpxHeader
if decryptHeaderAndGetMsgSize(peer.secretsState,
headerBytes, msgSize).isErr():
headerBytes, msgSize, msgHeader).isErr():
await peer.disconnectAndRaise(BreachOfProtocol,
"Cannot decrypt RLPx frame header")
@ -522,16 +572,104 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
if decryptedBytes.len == 0:
await peer.disconnectAndRaise(BreachOfProtocol,
"Snappy uncompress encountered malformed data")
# Check embedded header-data for start of an obsoleted chunked message.
#
# The current RLPx requirements need all triple entries <= 0, see
# github.com/ethereum/devp2p/blob/master/rlpx.md#framing
let (capaId, ctxId, totalMsgSize) = msgHeader.getRlpxHeaderData
when not allowObsoletedChunkedMessages:
# Note that the check should come *before* the `msgId` is read. For
# instance, if this is a malformed packet, then the `msgId` might be
# random which in turn might try to access a `peer.dispatcher.messages[]`
# slot with a `nil` entry.
if 0 < capaId or 0 < ctxId or 0 < totalMsgSize:
await peer.disconnectAndRaise(
BreachOfProtocol, "Rejected obsoleted chunked message header")
var rlp = rlpFromBytes(decryptedBytes)
var msgId: int32
try:
# int32 as this seems more than big enough for the amount of msgIds
let msgId = rlp.read(int32)
return (msgId.int, rlp)
msgId = rlp.read(int32)
result = (msgId.int, rlp)
except RlpError:
await peer.disconnectAndRaise(BreachOfProtocol,
"Cannot read RLPx message id")
# Handle chunked messages
when allowObsoletedChunkedMessages:
# Snappy with obsolete chunked RLPx message datagrams is unsupported here
when useSnappy:
if peer.snappyEnabled:
return
# This also covers totalMessageSize <= 0
if totalMsgSize <= msgSize:
return
# Loop over chunked RLPx datagram fragments
var moreData = totalMsgSize - msgSize
while 0 < moreData:
# Load and parse next header
block:
await peer.transport.readExactly(addr headerBytes[0], 32)
if decryptHeaderAndGetMsgSize(peer.secretsState,
headerBytes, msgSize, msgHeader).isErr():
trace "RLPx next chunked header-data failed",
peer, msgId, ctxId, maxSize = moreData
await peer.disconnectAndRaise(
BreachOfProtocol, "Cannot decrypt next chunked RLPx header")
# Verify that this is really the next chunk
block:
let (_, ctyId, totalSize) = msgHeader.getRlpxHeaderData
if ctyId != ctxId or 0 < totalSize:
trace "Malformed RLPx next chunked header-data",
peer, msgId, msgSize, ctxtId = ctyId, expCtxId = ctxId, totalSize
await peer.disconnectAndRaise(
BreachOfProtocol, "Malformed next chunked RLPx header")
# Append payload to `decryptedBytes` collector
block:
var encBytes = newSeq[byte](msgSize.encryptedLength - 32)
await peer.transport.readExactly(addr encBytes[0], encBytes.len)
var
dcrBytes = newSeq[byte](msgSize.decryptedLength)
dcrBytesCount = 0
# TODO: This should be improved by passing a reference into
# `decryptedBytes` where to append the data.
if decryptBody(peer.secretsState, encBytes, msgSize,
dcrBytes, dcrBytesCount).isErr():
await peer.disconnectAndRaise(
BreachOfProtocol, "Cannot decrypt next chunked RLPx frame body")
decryptedBytes.add dcrBytes[0 ..< dcrBytesCount]
moreData -= msgSize
#[
trace "RLPx next chunked datagram fragment",
peer, msgId = result[0], ctxId, msgSize, moreData, totalMsgSize,
dcrBytesCount, payloadSoFar = decryptedBytes.len
#]#
# End While
if moreData != 0:
await peer.disconnectAndRaise(
BreachOfProtocol, "Malformed assembly of chunked RLPx message")
# Pass back extended message (first entry remains `msgId`)
result[1] = decryptedBytes.rlpFromBytes
result[1].position = rlp.position
trace "RLPx chunked datagram payload",
peer, msgId, ctxId, totalMsgSize, moreData, payload = decryptedBytes.len
# End `allowObsoletedChunkedMessages`
proc checkedRlpRead(peer: Peer, r: var Rlp, MsgType: type):
auto {.raises: [RlpError, Defect].} =
when defined(release):

View File

@ -200,6 +200,14 @@ proc decryptHeader*(c: var SecretState, data: openArray[byte],
c.aesdec.decrypt(toa(data, 0, RlpHeaderLength), output)
result = ok()
proc decryptHeaderAndGetMsgSize*(c: var SecretState,
encryptedHeader: openArray[byte],
outSize: var int,
outHeader: var RlpxHeader): RlpxResult[void] =
result = decryptHeader(c, encryptedHeader, outHeader)
if result.isOk():
outSize = outHeader.getBodySize
proc decryptHeaderAndGetMsgSize*(c: var SecretState,
encryptedHeader: openArray[byte],
outSize: var int): RlpxResult[void] =