devp2p: upgrade to v5 (EIP-706) (#760)

* drop support for v4 (obsolete, doesn't work with all clients since
they use chunking and other obsolete v4 features that we're missing or
don't support it at all)
* rework asyncraises
* always store generated p2p macro code (similar to eth2)
* preparation for chronos cancellation support (more to be done here)
* when peer is disconnected, ensure pending handshakes and requests are
notified (instead of waiting for timeout)
* disallow raising from `onPeerDisconnected` - this simplifies
disconnection coordination among async tasks
* introduce several warning logs for protocol breaches - these should be
removed eventually, pending q/a on the rlpx layer in general
* fix snappy compression - the payload without msgId should be
compressed
* remove strict checks on unused fields in RLPx message header (this
matches geth behavior and the spirit of EIP-8)
* add snappy dep
This commit is contained in:
Jacek Sieka 2024-11-08 03:44:04 +01:00 committed by GitHub
parent 034b7886de
commit 88e4be4dc4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 643 additions and 585 deletions

View File

@ -20,7 +20,8 @@ requires "nim >= 1.6.0",
"testutils",
"unittest2",
"results",
"minilru"
"minilru",
"snappy"
let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js)

View File

@ -79,7 +79,6 @@ proc newEthereumNode*(
networkId: NetworkId,
clientId = "nim-eth-p2p",
addAllCapabilities = true,
useCompression: bool = false,
minPeers = 10,
bootstrapNodes: seq[ENode] = @[],
bindUdpPort: Port,
@ -105,11 +104,6 @@ proc newEthereumNode*(
keys.seckey, address, bootstrapNodes, bindUdpPort, bindIp, rng)
result.rng = rng
when useSnappy:
result.protocolVersion = if useCompression: devp2pSnappyVersion
else: devp2pVersion
result.protocolStates.newSeq protocolCount()
result.peerPool = newPeerPool(

View File

@ -35,7 +35,7 @@ type
msgResponse
Message* = ref object
id*: Opt[uint64]
id*: uint64
ident*: NimNode
kind*: MessageKind
procDef*: NimNode
@ -351,15 +351,17 @@ proc init*(T: type P2PProtocol, backendFactory: BackendFactory,
if not result.backend.afterProtocolInit.isNil:
result.backend.afterProtocolInit(result)
proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = Opt.none(uint64)) =
proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, canRaise: bool, msgId = Opt.none(uint64)) =
## This procs adds a set of common helpers available in all messages handlers
## (e.g. `perProtocolMsgId`, `peer.state`, etc).
userHandlerProc.addPragma ident"gcsafe"
# we only take the pragma
let dummy = quote do:
proc dummy(): Future[void] {.async: (raises: [EthP2PError]).}
let dummy = if canRaise:
quote do:
proc dummy(): Future[void] {.async: (raises: [CancelledError, EthP2PError]).}
else:
quote do:
proc dummy(): Future[void] {.async: (raises: []).}
if p.isRlpx:
userHandlerProc.addPragma dummy.pragma[0]
@ -402,27 +404,16 @@ proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = Opt.no
template networkState(`peerVar`: `PeerType`): `NetworkStateType` {.used.} =
`NetworkStateType`(`getNetworkState`(`peerVar`.network, `protocolInfo`))
proc addExceptionHandler(userHandlerProc: NimNode) =
let bodyTemp = userHandlerProc.body
userHandlerProc.body = quote do:
try:
`bodyTemp`
except CancelledError as exc:
raise newException(EthP2PError, exc.msg)
except CatchableError as exc:
raise newException(EthP2PError, exc.msg)
proc addPreludeDefs(userHandlerProc: NimNode, definitions: NimNode) =
userHandlerProc.body[0].add definitions
proc eventHandlerToProc(p: P2PProtocol, doBlock: NimNode, handlerName: string): NimNode =
proc eventHandlerToProc(p: P2PProtocol, doBlock: NimNode, handlerName: string, canRaise: bool): NimNode =
## Turns a "named" do block to a regular async proc
## (e.g. onPeerConnected do ...)
result = newTree(nnkProcDef)
doBlock.copyChildrenTo(result)
result.name = ident(p.name & handlerName) # genSym(nskProc, p.name & handlerName)
p.augmentUserHandler result
result.addExceptionHandler()
p.augmentUserHandler result, canRaise
proc addTimeoutParam(procDef: NimNode, defaultValue: int64) =
var
@ -477,7 +468,7 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, msgId: uint64,
recBody = newTree(nnkDistinctTy, recName)
result = Message(protocol: protocol,
id: Opt.some(msgId),
id: msgId,
ident: msgIdent,
kind: kind,
procDef: procDef,
@ -489,7 +480,7 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, msgId: uint64,
if procDef.body.kind != nnkEmpty:
var userHandler = copy procDef
protocol.augmentUserHandler userHandler, Opt.some(msgId)
protocol.augmentUserHandler userHandler, true, Opt.some(msgId)
userHandler.name = ident(msgName & "UserHandler")
# Request and Response handlers get an extra `reqId` parameter if the
@ -518,7 +509,6 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, msgId: uint64,
of msgResponse: userHandler.applyDecorator protocol.incomingResponseDecorator
else: discard
userHandler.addExceptionHandler()
result.userHandler = userHandler
protocol.outRecvProcs.add result.userHandler
@ -543,7 +533,7 @@ proc addMsg(p: P2PProtocol, msgId: uint64, procDef: NimNode) =
let
responseIdent = ident($procDef.name & "Response")
response = Message(protocol: p,
id: Opt.none(uint64),
id: msgId,
ident: responseIdent,
kind: msgResponse,
recName: returnType,
@ -589,7 +579,10 @@ proc createSendProc*(msg: Message,
name = if nameSuffix.len == 0: msg.identWithExportMarker
else: ident($msg.ident & nameSuffix)
pragmas = if procType == nnkProcDef: newTree(nnkPragma, ident"gcsafe")
dummy = quote do:
proc dummy(): Future[void] {.async: (raises: [CancelledError, EthP2PError], raw: true).}
pragmas = if procType == nnkProcDef: dummy.pragma
else: newEmptyNode()
var def = newNimNode(procType).add(
@ -641,7 +634,7 @@ proc createSendProc*(msg: Message,
of msgNotification:
discard
def[3][0] = if procType == nnkMacroDef:
def[3][0] = if procType in [nnkMacroDef, nnkTemplateDef]:
ident "untyped"
elif msg.kind == msgRequest and not isRawSender:
Fut(msg.requestResultType)
@ -751,9 +744,9 @@ proc netInit*(p: P2PProtocol): NimNode =
p.backend.NetworkType,
p.NetworkStateType)
proc createHandshakeTemplate*(msg: Message,
rawSendProc, handshakeImpl,
nextMsg: NimNode): SendProc =
proc createHandshakeTemplate*(
msg: Message, rawSendProc, handshakeImpl, nextMsg: NimNode
): SendProc =
let
handshakeExchanger = msg.createSendProc(procType = nnkTemplateDef)
forwardCall = newCall(rawSendProc).appendAllInputParams(handshakeExchanger.def)
@ -763,19 +756,13 @@ proc createHandshakeTemplate*(msg: Message,
forwardCall[1] = peerVar
forwardCall.del(forwardCall.len - 1)
let peerVar = genSym(nskLet ,"peer")
let peerVar = genSym(nskLet, "peer")
handshakeExchanger.setBody quote do:
try:
let `peerVar` = `peerValue`
let sendingFuture = `forwardCall`
`handshakeImpl`(`peerVar`,
sendingFuture,
`nextMsg`(`peerVar`, `msgRecName`),
`timeoutVar`)
except PeerDisconnected as exc:
raise newException(EthP2PError, exc.msg)
except P2PInternalError as exc:
raise newException(EthP2PError, exc.msg)
let `peerVar` = `peerValue`
let sendingFuture = `forwardCall`
`handshakeImpl`[`msgRecName`](
`peerVar`, sendingFuture, `nextMsg`(`peerVar`, `msgRecName`), `timeoutVar`
)
return handshakeExchanger
@ -844,10 +831,10 @@ proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode) =
inc nextId
elif eqIdent(n[0], "onPeerConnected"):
p.onPeerConnected = p.eventHandlerToProc(n[1], "PeerConnected")
p.onPeerConnected = p.eventHandlerToProc(n[1], "PeerConnected", true)
elif eqIdent(n[0], "onPeerDisconnected"):
p.onPeerDisconnected = p.eventHandlerToProc(n[1], "PeerDisconnected")
p.onPeerDisconnected = p.eventHandlerToProc(n[1], "PeerDisconnected", false)
else:
error(repr(n) & " is not a recognized call in P2P protocol definitions", n)
@ -894,11 +881,8 @@ proc genTypeSection*(p: P2PProtocol): NimNode =
if msg.procDef == nil:
continue
# FIXME: Can `msg.id` be missing, at all?
doAssert msg.id.isSome()
let
msgId = msg.id.value
msgId = msg.id
msgName = msg.ident
msgRecName = msg.recName
msgStrongRecName = msg.strongRecName
@ -983,12 +967,11 @@ macro emitForSingleBackend(
result = p.genCode()
when defined(p2pProtocolDebug):
try:
result.storeMacroResult true
except IOError:
# IO error so the generated nim code might not be stored, don't sweat it.
discard
try:
result.storeMacroResult true
except IOError:
# IO error so the generated nim code might not be stored, don't sweat it.
discard
macro emitForAllBackends(backendSyms: typed, options: untyped, body: untyped): untyped =
let name = $(options[0])

View File

@ -17,10 +17,7 @@ import
".."/../[rlp], ../../common/[base, keys],
".."/[enode, kademlia, discovery, rlpxtransport]
export base.NetworkId, rlpxtransport
const
useSnappy* = defined(useSnappy)
export base.NetworkId, rlpxtransport, kademlia
type
EthereumNode* = ref object
@ -39,8 +36,6 @@ type
listeningServer*: StreamServer
protocolStates*: seq[RootRef]
discovery*: DiscoveryProtocol
when useSnappy:
protocolVersion*: uint64
rng*: ref HmacDrbgContext
Peer* = ref object
@ -55,8 +50,7 @@ type
protocolStates*: seq[RootRef]
outstandingRequests*: seq[Deque[OutstandingRequest]] # per `msgId` table
awaitedMessages*: seq[FutureBase] # per `msgId` table
when useSnappy:
snappyEnabled*: bool
snappyEnabled*: bool
clientId*: string
SeenNode* = object
@ -119,8 +113,9 @@ type
# Private fields:
peerStateInitializer*: PeerStateInitializer
networkStateInitializer*: NetworkStateInitializer
handshake*: HandshakeStep
disconnectHandler*: DisconnectionHandler
onPeerConnected*: OnPeerConnectedHandler
onPeerDisconnected*: OnPeerDisconnectedHandler
MessageInfo* = ref object
id*: uint64 # this is a `msgId` (as opposed to a `reqId`)
@ -131,6 +126,7 @@ type
printer*: MessageContentPrinter
requestResolver*: RequestResolver
nextMsgResolver*: NextMsgResolver
failResolver*: FailResolver
Dispatcher* = ref object # private
# The dispatcher stores the mapping of negotiated message IDs between
@ -156,13 +152,12 @@ type
OutstandingRequest* = object
id*: uint64 # a `reqId` that may be used for response
future*: FutureBase
timeoutAt*: Moment
# Private types:
MessageHandlerDecorator* = proc(msgId: uint64, n: NimNode): NimNode
ThunkProc* = proc(x: Peer, msgId: uint64, data: Rlp): Future[void]
{.gcsafe, async: (raises: [RlpError, EthP2PError]).}
ThunkProc* = proc(x: Peer, data: Rlp): Future[void]
{.async: (raises: [CancelledError, EthP2PError]).}
MessageContentPrinter* = proc(msg: pointer): string
{.gcsafe, raises: [].}
@ -173,17 +168,20 @@ type
NextMsgResolver* = proc(msgData: Rlp, future: FutureBase)
{.gcsafe, raises: [RlpError].}
FailResolver* = proc(reason: DisconnectionReason, future: FutureBase)
{.gcsafe, raises: [].}
PeerStateInitializer* = proc(peer: Peer): RootRef
{.gcsafe, raises: [].}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef
{.gcsafe, raises: [].}
HandshakeStep* = proc(peer: Peer): Future[void]
{.gcsafe, async: (raises: [EthP2PError]).}
OnPeerConnectedHandler* = proc(peer: Peer): Future[void]
{.async: (raises: [CancelledError, EthP2PError]).}
DisconnectionHandler* = proc(peer: Peer, reason: DisconnectionReason):
Future[void] {.gcsafe, async: (raises: [EthP2PError]).}
OnPeerDisconnectedHandler* = proc(peer: Peer, reason: DisconnectionReason):
Future[void] {.async: (raises: []).}
ConnectionState* = enum
None,

File diff suppressed because it is too large Load Diff

View File

@ -46,7 +46,8 @@ proc initiatorHandshake(
writeRes = await stream.write(addr authMsg[0], authMsgLen)
if writeRes != authMsgLen:
raise (ref RlpxTransportError)(msg: "Could not write RLPx handshake header")
# TOOD raising a chronos error here is a hack - rework using something else
raise (ref TransportIncompleteError)(msg: "Could not write RLPx handshake header")
var ackMsg = newSeqOfCap[byte](1024)
ackMsg.setLen(MsgLenLenEIP8)
@ -95,7 +96,8 @@ proc responderHandshake(
var res = await stream.write(addr ackMsg[0], ackMsgLen)
if res != ackMsgLen:
raise (ref RlpxTransportError)(msg: "Could not write RLPx ack message")
# TOOD raising a chronos error here is a hack - rework using something else
raise (ref TransportIncompleteError)(msg: "Could not write RLPx ack message")
(handshake.getSecrets(authMsg, ^ackMsg), handshake.remoteHPubkey)
@ -149,11 +151,9 @@ proc recvMsg*(
let msgHeader = decryptHeader(transport.state, msgHeaderEnc).valueOr:
raise (ref RlpxTransportError)(msg: "Cannot decrypt RLPx frame header")
# The capability-id and context id are always zero
# The header has more fields than the size, but they are unused / obsolete.
# Although some clients set them, we don't check this in the spirit of EIP-8
# https://github.com/ethereum/devp2p/blob/5713591d0366da78a913a811c7502d9ca91d29a8/rlpx.md#framing
if (msgHeader[4] != 0x80) or (msgHeader[5] != 0x80):
raise
(ref RlpxTransportError)(msg: "Invalid capability-id/context-id in RLPx header")
let msgSize = msgHeader.getBodySize()
let remainingBytes = encryptedLength(msgSize) - 32
@ -170,7 +170,6 @@ proc recvMsg*(
reset(encryptedBytes) # Release memory (TODO: in-place decryption)
msgBody.setLen(msgSize) # Remove padding
msgBody
proc sendMsg*(
@ -179,7 +178,8 @@ proc sendMsg*(
let cipherText = encryptMsg(data, transport.state)
var res = await transport.stream.write(cipherText)
if res != len(cipherText):
raise (ref RlpxTransportError)(msg: "Could not complete writing message")
# TOOD raising a chronos error here is a hack - rework using something else
raise (ref TransportIncompleteError)(msg: "Could not complete writing message")
proc remoteAddress*(
transport: RlpxTransport

View File

@ -31,8 +31,6 @@ p2pProtocol abc(version = 1,
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}:
peer.networkState.count -= 1
if true:
raise newException(CatchableError, "Fake abc exception")
p2pProtocol xyz(version = 1,
rlpxName = "xyz",
@ -45,8 +43,6 @@ p2pProtocol xyz(version = 1,
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}:
peer.networkState.count -= 1
if true:
raise newException(CatchableError, "Fake xyz exception")
peer.state.status = "disconnected"
p2pProtocol hah(version = 1,
@ -77,11 +73,10 @@ suite "Testing protocol handlers":
await peer.disconnect(SubprotocolReason, true)
check:
# we want to check that even though the exceptions in the disconnect
# handlers, each handler still ran
# all disconnection handlers are called
node1.protocolState(abc).count == 0
node1.protocolState(xyz).count == 0
peer.state(xyz).status == "connected"
peer.state(xyz).status == "disconnected"
asyncTest "Failing connection handler":
let rng = newRng()

View File

@ -1,7 +1,7 @@
{
"Invalid list when decoding for object": {
"payload": "03",
"error": "MalformedRlpError",
"error": "PeerDisconnected",
"description": "Object parameters are expected to be encoded in an RLP list"
},
"Message id that is not supported": {
@ -21,62 +21,62 @@
},
"No Hash nor Status, but empty list": {
"payload": "20c1c0",
"error": "RlpTypeMismatch",
"error": "PeerDisconnected",
"description": "Decoding to HashOrStatus expects blob of size 1 or 32"
},
"No Hash nor Status, list instead of blob": {
"payload": "20c2c1c0",
"error": "RlpTypeMismatch",
"error": "PeerDisconnected",
"description": "Decoding to HashOrStatus expects blob of size 1 or 32"
},
"No Hash nor Status, blob of 2 bytes": {
"payload": "20c4c3820011",
"error": "RlpTypeMismatch",
"error": "PeerDisconnected",
"description": "Decoding to HashOrStatus expects blob of size 1 or 32"
},
"No Hash nor Status, blob of 33 bytes": {
"payload": "20e3e2a100112233445566778899aabbccddeeff00112233445566778899aabbcceeddff33",
"error": "RlpTypeMismatch",
"error": "PeerDisconnected",
"description": "Decoding to HashOrStatus expects blob of size 1 or 32"
},
"Listing elements when no data": {
"payload": "01e1",
"error": "MalformedRlpError",
"error": "PeerDisconnected",
"description": "listElem to error on empty list"
},
"Listing elements when invalid length": {
"payload": "01ffdada",
"error": "MalformedRlpError",
"error": "PeerDisconnected",
"description": "listElem to error on invalid size encoding"
},
"Listing single element list when having more entries": {
"payload": "01c20420",
"error": "RlpTypeMismatch",
"error": "PeerDisconnected",
"description": "listElem to assert on not a single entry list"
},
"Listing single element list when having empty list": {
"payload": "01c0",
"error": "RlpTypeMismatch",
"error": "PeerDisconnected",
"description": "listElem to assert on not a single entry list"
},
"DisconnectReason: single element list with entry out off enum range": {
"payload": "01c111",
"error": "RlpTypeMismatch",
"error": "PeerDisconnected",
"description": "Disconnect reason code out of bounds 0..16 (got: 17)"
},
"DisconnectReason: single element out off enum range": {
"payload": "0111",
"error": "RlpTypeMismatch",
"error": "PeerDisconnected",
"description": "Disconnect reason code out of bounds 0..16 (got: 17)"
},
"DisconnectReason: single element list with enum hole value": {
"payload": "01c10C",
"error": "RlpTypeMismatch",
"error": "PeerDisconnected",
"description": "Error on Disconnect reason with enum hole value"
},
"DisconnectReason: single element with enum hole value": {
"payload": "010C",
"error": "RlpTypeMismatch",
"error": "PeerDisconnected",
"description": "Error on Disconnect reason with enum hole value"
},
"devp2p hello packet version 22 + additional list elements for EIP-8": {