diff --git a/eth/p2p/rlpx.nim b/eth/p2p/rlpx.nim index fe67874..d536d74 100644 --- a/eth/p2p/rlpx.nim +++ b/eth/p2p/rlpx.nim @@ -13,6 +13,17 @@ import ".."/[rlp, common, keys, async_utils], ./private/p2p_types, "."/[kademlia, auth, rlpxcrypt, enode, p2p_protocol_dsl] +const + # Insane kludge for suppporting chunked messages when syncing against clients + # like Nethermind. + # + # The original specs which are now obsoleted can be found here: + # github.com/ethereum/devp2p/commit/6504d410bc4b8dda2b43941e1cb48c804b90cf22. + # + # The current requirement is stated at + # github.com/ethereum/devp2p/blob/master/rlpx.md#framing + allowObsoletedChunkedMessages = defined(chunked_rlpx_enabled) + when useSnappy: import snappy const devp2pSnappyVersion* = 5 @@ -233,7 +244,8 @@ proc getDispatcher(node: EthereumNode, proc getMsgName*(peer: Peer, msgId: int): string = if not peer.dispatcher.isNil and - msgId < peer.dispatcher.messages.len: + msgId < peer.dispatcher.messages.len and + not peer.dispatcher.messages[msgId].isNil: return peer.dispatcher.messages[msgId].name else: return case msgId @@ -478,6 +490,43 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) = debug "late or duplicate reply for a RLPx request" +proc getRlpxHeaderData(header: RlpxHeader): (int,int,int) = + ## Helper for `recvMsg()` + # This is insane. Some clients like Nethermind use the now obsoleted + # chunked message frame protocol, see + # github.com/ethereum/devp2p/commit/6504d410bc4b8dda2b43941e1cb48c804b90cf22. + result = (-1, -1, 0) + proc datagramSize: int = + # For logging only + (header[0].int shl 16) or (header[1].int shl 8) or header[1].int + try: + let optsLen = max(0, header[3].int - 0xc0) + var hdrData = header[4 ..< 4 + optsLen].rlpFromBytes + result[0] = hdrData.read(int) # capability ID + result[1] = hdrData.read(int) # context ID + if hdrData.isBlob: + result[2] = hdrData.read(int) # total packet size + trace "RLPx message first chunked header-data", + capabilityId = result[0], + contextId = result[1], + totalPacketSize = result[2], + datagramSize = datagramSize() + #[ + elif 0 < result[1]: + # This should be all zero according to latest specs + trace "RLPx message chunked next header-data", + capabilityId = result[0], + contextId = result[1], + datagramSize = datagramSize() + #]# + except: + error "RLPx message header-data options, parse error", + capabilityId = result[0], + contextId = result[1], + totalPacketSize = result[2], + datagramSize = datagramSize() + result = (-1, -1, -1) + proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} = ## This procs awaits the next complete RLPx message in the TCP stream @@ -485,8 +534,9 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} = await peer.transport.readExactly(addr headerBytes[0], 32) var msgSize: int + var msgHeader: RlpxHeader if decryptHeaderAndGetMsgSize(peer.secretsState, - headerBytes, msgSize).isErr(): + headerBytes, msgSize, msgHeader).isErr(): await peer.disconnectAndRaise(BreachOfProtocol, "Cannot decrypt RLPx frame header") @@ -522,16 +572,104 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} = if decryptedBytes.len == 0: await peer.disconnectAndRaise(BreachOfProtocol, "Snappy uncompress encountered malformed data") + + # Check embedded header-data for start of an obsoleted chunked message. + # + # The current RLPx requirements need all triple entries <= 0, see + # github.com/ethereum/devp2p/blob/master/rlpx.md#framing + let (capaId, ctxId, totalMsgSize) = msgHeader.getRlpxHeaderData + + when not allowObsoletedChunkedMessages: + # Note that the check should come *before* the `msgId` is read. For + # instance, if this is a malformed packet, then the `msgId` might be + # random which in turn might try to access a `peer.dispatcher.messages[]` + # slot with a `nil` entry. + if 0 < capaId or 0 < ctxId or 0 < totalMsgSize: + await peer.disconnectAndRaise( + BreachOfProtocol, "Rejected obsoleted chunked message header") + var rlp = rlpFromBytes(decryptedBytes) + var msgId: int32 try: # int32 as this seems more than big enough for the amount of msgIds - let msgId = rlp.read(int32) - return (msgId.int, rlp) + msgId = rlp.read(int32) + result = (msgId.int, rlp) except RlpError: await peer.disconnectAndRaise(BreachOfProtocol, "Cannot read RLPx message id") + # Handle chunked messages + when allowObsoletedChunkedMessages: + # Snappy with obsolete chunked RLPx message datagrams is unsupported here + when useSnappy: + if peer.snappyEnabled: + return + + # This also covers totalMessageSize <= 0 + if totalMsgSize <= msgSize: + return + + # Loop over chunked RLPx datagram fragments + var moreData = totalMsgSize - msgSize + while 0 < moreData: + + # Load and parse next header + block: + await peer.transport.readExactly(addr headerBytes[0], 32) + if decryptHeaderAndGetMsgSize(peer.secretsState, + headerBytes, msgSize, msgHeader).isErr(): + trace "RLPx next chunked header-data failed", + peer, msgId, ctxId, maxSize = moreData + await peer.disconnectAndRaise( + BreachOfProtocol, "Cannot decrypt next chunked RLPx header") + + # Verify that this is really the next chunk + block: + let (_, ctyId, totalSize) = msgHeader.getRlpxHeaderData + if ctyId != ctxId or 0 < totalSize: + trace "Malformed RLPx next chunked header-data", + peer, msgId, msgSize, ctxtId = ctyId, expCtxId = ctxId, totalSize + await peer.disconnectAndRaise( + BreachOfProtocol, "Malformed next chunked RLPx header") + + # Append payload to `decryptedBytes` collector + block: + var encBytes = newSeq[byte](msgSize.encryptedLength - 32) + await peer.transport.readExactly(addr encBytes[0], encBytes.len) + var + dcrBytes = newSeq[byte](msgSize.decryptedLength) + dcrBytesCount = 0 + # TODO: This should be improved by passing a reference into + # `decryptedBytes` where to append the data. + if decryptBody(peer.secretsState, encBytes, msgSize, + dcrBytes, dcrBytesCount).isErr(): + await peer.disconnectAndRaise( + BreachOfProtocol, "Cannot decrypt next chunked RLPx frame body") + decryptedBytes.add dcrBytes[0 ..< dcrBytesCount] + moreData -= msgSize + #[ + trace "RLPx next chunked datagram fragment", + peer, msgId = result[0], ctxId, msgSize, moreData, totalMsgSize, + dcrBytesCount, payloadSoFar = decryptedBytes.len + #]# + + # End While + + if moreData != 0: + await peer.disconnectAndRaise( + BreachOfProtocol, "Malformed assembly of chunked RLPx message") + + # Pass back extended message (first entry remains `msgId`) + result[1] = decryptedBytes.rlpFromBytes + result[1].position = rlp.position + + trace "RLPx chunked datagram payload", + peer, msgId, ctxId, totalMsgSize, moreData, payload = decryptedBytes.len + + # End `allowObsoletedChunkedMessages` + + proc checkedRlpRead(peer: Peer, r: var Rlp, MsgType: type): auto {.raises: [RlpError, Defect].} = when defined(release): diff --git a/eth/p2p/rlpxcrypt.nim b/eth/p2p/rlpxcrypt.nim index 309051d..193bb41 100644 --- a/eth/p2p/rlpxcrypt.nim +++ b/eth/p2p/rlpxcrypt.nim @@ -200,6 +200,14 @@ proc decryptHeader*(c: var SecretState, data: openArray[byte], c.aesdec.decrypt(toa(data, 0, RlpHeaderLength), output) result = ok() +proc decryptHeaderAndGetMsgSize*(c: var SecretState, + encryptedHeader: openArray[byte], + outSize: var int, + outHeader: var RlpxHeader): RlpxResult[void] = + result = decryptHeader(c, encryptedHeader, outHeader) + if result.isOk(): + outSize = outHeader.getBodySize + proc decryptHeaderAndGetMsgSize*(c: var SecretState, encryptedHeader: openArray[byte], outSize: var int): RlpxResult[void] =