mirror of https://github.com/status-im/nim-eth.git
Using unsigned types for message type and requst IDs (#722)
* Using unsigned types for message type and requst IDs why: Negative values are neither defined for RLP nor in the protocol specs which refer to the RLPs (see yellow paper app B clause (199). * Fix `int` argument (must be `uint`) in fuzzing tests why: Not part of all tests so it slipped through.
This commit is contained in:
parent
b874e12516
commit
5ecbcb5886
|
@ -1,7 +1,30 @@
|
|||
# nim-eth
|
||||
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except
|
||||
# according to those terms.
|
||||
|
||||
## This module defines a DSL for constructing communication message packet
|
||||
## drivers to support the ones as defined with the Ethereum p2p protocols
|
||||
## (see `devp2p <https://github.com/ethereum/devp2p/tree/master>`_.)
|
||||
##
|
||||
## Particular restictions apply to intrinsic message entities as message
|
||||
## ID and request/response ID. Both must be unsigned. This is a consequence
|
||||
## of message packets being serialised using RLP which is defined for
|
||||
## non-negative scalar values only, see
|
||||
## `Yellow Paper <https://ethereum.github.io/yellowpaper/paper.pdf#appendix.B>`_,
|
||||
## Appx B, clauses (195) ff. and (199).)
|
||||
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, sequtils, macrocache],
|
||||
results,
|
||||
stew/shims/macros, chronos, faststreams/outputs
|
||||
|
||||
type
|
||||
|
@ -12,7 +35,7 @@ type
|
|||
msgResponse
|
||||
|
||||
Message* = ref object
|
||||
id*: int
|
||||
id*: Opt[uint]
|
||||
ident*: NimNode
|
||||
kind*: MessageKind
|
||||
procDef*: NimNode
|
||||
|
@ -126,7 +149,7 @@ let
|
|||
# Variable names affecting the public interface of the library:
|
||||
reqIdVar* {.compileTime.} = ident "reqId"
|
||||
# XXX: Binding the int type causes instantiation failure for some reason
|
||||
ReqIdType* {.compileTime.} = ident "int"
|
||||
ReqIdType* {.compileTime.} = ident "uint"
|
||||
peerVar* {.compileTime.} = ident "peer"
|
||||
responseVar* {.compileTime.} = ident "response"
|
||||
streamVar* {.compileTime.} = ident "stream"
|
||||
|
@ -322,14 +345,14 @@ proc init*(T: type P2PProtocol, backendFactory: BackendFactory,
|
|||
assert(not result.backend.implementProtocolInit.isNil)
|
||||
|
||||
if result.backend.ReqIdType.isNil:
|
||||
result.backend.ReqIdType = ident "int"
|
||||
result.backend.ReqIdType = ident "uint"
|
||||
|
||||
result.processProtocolBody body
|
||||
|
||||
if not result.backend.afterProtocolInit.isNil:
|
||||
result.backend.afterProtocolInit(result)
|
||||
|
||||
proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = -1) =
|
||||
proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = Opt.none(uint)) =
|
||||
## This procs adds a set of common helpers available in all messages handlers
|
||||
## (e.g. `perProtocolMsgId`, `peer.state`, etc).
|
||||
|
||||
|
@ -363,7 +386,8 @@ proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = -1) =
|
|||
prelude.add quote do:
|
||||
type `currentProtocolSym` {.used.} = `protocolNameIdent`
|
||||
|
||||
if msgId >= 0 and p.isRlpx:
|
||||
if msgId.isSome and p.isRlpx:
|
||||
let msgId = msgId.value
|
||||
prelude.add quote do:
|
||||
const `perProtocolMsgIdVar` {.used.} = `msgId`
|
||||
|
||||
|
@ -422,7 +446,7 @@ proc ResponderType(msg: Message): NimNode =
|
|||
proc needsSingleParamInlining(msg: Message): bool =
|
||||
msg.recBody.kind == nnkDistinctTy
|
||||
|
||||
proc newMsg(protocol: P2PProtocol, kind: MessageKind, id: int,
|
||||
proc newMsg(protocol: P2PProtocol, kind: MessageKind, msgId: uint,
|
||||
procDef: NimNode, response: Message = nil): Message =
|
||||
|
||||
if procDef[0].kind == nnkPostfix:
|
||||
|
@ -454,7 +478,7 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, id: int,
|
|||
recBody = newTree(nnkDistinctTy, recName)
|
||||
|
||||
result = Message(protocol: protocol,
|
||||
id: id,
|
||||
id: Opt.some(msgId),
|
||||
ident: msgIdent,
|
||||
kind: kind,
|
||||
procDef: procDef,
|
||||
|
@ -466,7 +490,7 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, id: int,
|
|||
if procDef.body.kind != nnkEmpty:
|
||||
var userHandler = copy procDef
|
||||
|
||||
protocol.augmentUserHandler userHandler, id
|
||||
protocol.augmentUserHandler userHandler, Opt.some(msgId)
|
||||
userHandler.name = ident(msgName & "UserHandler")
|
||||
|
||||
# Request and Response handlers get an extra `reqId` parameter if the
|
||||
|
@ -504,7 +528,7 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, id: int,
|
|||
proc isVoid(t: NimNode): bool =
|
||||
t.kind == nnkEmpty or eqIdent(t, "void")
|
||||
|
||||
proc addMsg(p: P2PProtocol, id: int, procDef: NimNode) =
|
||||
proc addMsg(p: P2PProtocol, msgId: uint, procDef: NimNode) =
|
||||
var
|
||||
returnType = procDef.params[0]
|
||||
hasReturnValue = not isVoid(returnType)
|
||||
|
@ -520,7 +544,7 @@ proc addMsg(p: P2PProtocol, id: int, procDef: NimNode) =
|
|||
let
|
||||
responseIdent = ident($procDef.name & "Response")
|
||||
response = Message(protocol: p,
|
||||
id: -1, # TODO: Implement the message IDs in RLPx-specific way
|
||||
id: Opt.none(uint),
|
||||
ident: responseIdent,
|
||||
kind: msgResponse,
|
||||
recName: returnType,
|
||||
|
@ -529,10 +553,10 @@ proc addMsg(p: P2PProtocol, id: int, procDef: NimNode) =
|
|||
outputParamDef: outputParam)
|
||||
|
||||
p.messages.add response
|
||||
let msg = p.newMsg(msgRequest, id, procDef, response = response)
|
||||
let msg = p.newMsg(msgRequest, msgId, procDef, response = response)
|
||||
p.requests.add Request(queries: @[msg], response: response)
|
||||
else:
|
||||
p.notifications.add p.newMsg(msgNotification, id, procDef)
|
||||
p.notifications.add p.newMsg(msgNotification, msgId, procDef)
|
||||
|
||||
proc identWithExportMarker*(msg: Message): NimNode =
|
||||
newTree(nnkPostfix, ident("*"), msg.ident)
|
||||
|
@ -847,7 +871,7 @@ proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode) =
|
|||
##
|
||||
## All messages will have properly computed numeric IDs
|
||||
##
|
||||
var nextId = 0
|
||||
var nextId = 0u
|
||||
|
||||
for n in protocolBody:
|
||||
case n.kind
|
||||
|
@ -856,7 +880,7 @@ proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode) =
|
|||
# By default message IDs are assigned in increasing order
|
||||
# `nextID` can be used to skip some of the numeric slots
|
||||
if n.len == 2 and n[1].kind == nnkIntLit:
|
||||
nextId = n[1].intVal.int
|
||||
nextId = n[1].intVal.uint
|
||||
else:
|
||||
error("nextID expects a single int value", n)
|
||||
|
||||
|
@ -871,10 +895,10 @@ proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode) =
|
|||
error "requestResponse expects a block with at least two proc definitions"
|
||||
|
||||
var queries = newSeq[Message]()
|
||||
let responseMsg = p.newMsg(msgResponse, nextId + procs.len - 1, procs[^1])
|
||||
let responseMsg = p.newMsg(msgResponse, nextId + procs.len.uint - 1, procs[^1])
|
||||
|
||||
for i in 0 .. procs.len - 2:
|
||||
queries.add p.newMsg(msgRequest, nextId + i, procs[i], response = responseMsg)
|
||||
queries.add p.newMsg(msgRequest, nextId + i.uint, procs[i], response = responseMsg)
|
||||
|
||||
p.requests.add Request(queries: queries, response: responseMsg)
|
||||
|
||||
|
@ -942,8 +966,11 @@ proc genTypeSection*(p: P2PProtocol): NimNode =
|
|||
if msg.procDef == nil:
|
||||
continue
|
||||
|
||||
# FIXME: Can `msg.id` be missing, at all?
|
||||
doAssert msg.id.isSome()
|
||||
|
||||
let
|
||||
msgId = msg.id
|
||||
msgId = msg.id.value
|
||||
msgName = msg.ident
|
||||
msgRecName = msg.recName
|
||||
msgStrongRecName = msg.strongRecName
|
||||
|
@ -964,7 +991,7 @@ proc genTypeSection*(p: P2PProtocol): NimNode =
|
|||
|
||||
if p.isRlpx:
|
||||
result.add quote do:
|
||||
template msgId*(`MSG`: type `msgStrongRecName`): int = `msgId`
|
||||
template msgId*(`MSG`: type `msgStrongRecName`): uint = `msgId`
|
||||
|
||||
proc genCode*(p: P2PProtocol): NimNode =
|
||||
for msg in p.messages:
|
||||
|
|
|
@ -1,15 +1,19 @@
|
|||
# nim-eth
|
||||
# Copyright (c) 2018-2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except
|
||||
# according to those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[deques, tables],
|
||||
chronos,
|
||||
results,
|
||||
".."/../[rlp, keys], ../../common/eth_types,
|
||||
".."/[enode, kademlia, discovery, rlpxcrypt]
|
||||
|
||||
|
@ -46,12 +50,12 @@ type
|
|||
# Private fields:
|
||||
transport*: StreamTransport
|
||||
dispatcher*: Dispatcher
|
||||
lastReqId*: int
|
||||
lastReqId*: Opt[uint]
|
||||
secretsState*: SecretState
|
||||
connectionState*: ConnectionState
|
||||
protocolStates*: seq[RootRef]
|
||||
outstandingRequests*: seq[Deque[OutstandingRequest]]
|
||||
awaitedMessages*: seq[FutureBase]
|
||||
outstandingRequests*: seq[Deque[OutstandingRequest]] # per `msgId` table
|
||||
awaitedMessages*: seq[FutureBase] # per `msgId` table
|
||||
when useSnappy:
|
||||
snappyEnabled*: bool
|
||||
|
||||
|
@ -120,7 +124,7 @@ type
|
|||
disconnectHandler*: DisconnectionHandler
|
||||
|
||||
MessageInfo* = ref object
|
||||
id*: int
|
||||
id*: uint # this is a `msgId` (as opposed to a `reqId`)
|
||||
name*: string
|
||||
|
||||
# Private fields:
|
||||
|
@ -138,12 +142,12 @@ type
|
|||
# protocol. If the other peer also supports the protocol, the stored
|
||||
# offset indicates the numeric value of the first message of the protocol
|
||||
# (for this particular connection). If the other peer doesn't support the
|
||||
# particular protocol, the stored offset is -1.
|
||||
# particular protocol, the stored offset is `Opt.none(uint)`.
|
||||
#
|
||||
# `messages` holds a mapping from valid message IDs to their handler procs.
|
||||
#
|
||||
protocolOffsets*: seq[int]
|
||||
messages*: seq[MessageInfo]
|
||||
protocolOffsets*: seq[Opt[uint]]
|
||||
messages*: seq[MessageInfo] # per `msgId` table (se above)
|
||||
activeProtocols*: seq[ProtocolInfo]
|
||||
|
||||
##
|
||||
|
@ -151,14 +155,14 @@ type
|
|||
##
|
||||
|
||||
OutstandingRequest* = object
|
||||
id*: int
|
||||
id*: uint # a `reqId` that may be used for response
|
||||
future*: FutureBase
|
||||
timeoutAt*: Moment
|
||||
|
||||
# Private types:
|
||||
MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode
|
||||
MessageHandlerDecorator* = proc(msgId: uint, n: NimNode): NimNode
|
||||
|
||||
ThunkProc* = proc(x: Peer, msgId: int, data: Rlp): Future[void]
|
||||
ThunkProc* = proc(x: Peer, msgId: uint, data: Rlp): Future[void]
|
||||
{.gcsafe, async: (raises: [RlpError, EthP2PError]).}
|
||||
|
||||
MessageContentPrinter* = proc(msg: pointer): string
|
||||
|
|
187
eth/p2p/rlpx.nim
187
eth/p2p/rlpx.nim
|
@ -1,9 +1,26 @@
|
|||
# nim-eth
|
||||
# Copyright (c) 2018-2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except
|
||||
# according to those terms.
|
||||
|
||||
## This module implements the `RLPx` Transport Protocol defined at
|
||||
## `RLPx <https://github.com/ethereum/devp2p/blob/master/rlpx.md>`_.
|
||||
##
|
||||
## Use NIM command line optipn `-d:p2pProtocolDebug` for dumping the
|
||||
## generated driver code (just to have it stored somewhere lest one forgets.)
|
||||
##
|
||||
## Both, the message ID and the request/response ID are now unsigned. This goes
|
||||
## along with the RLPx specs (see above) and the sub-protocol specs at
|
||||
## `sub-proto <https://github.com/ethereum/devp2p/tree/master/caps>`_ plus the
|
||||
## fact that RLP is defined for non-negative integers smaller than 2^64 only at
|
||||
## `Yellow Paper <https://ethereum.github.io/yellowpaper/paper.pdf#appendix.B>`_,
|
||||
## Appx B, clauses (195) ff and (199).
|
||||
##
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
|
@ -47,7 +64,7 @@ logScope:
|
|||
type
|
||||
ResponderWithId*[MsgType] = object
|
||||
peer*: Peer
|
||||
reqId*: int
|
||||
reqId*: uint
|
||||
|
||||
ResponderWithoutId*[MsgType] = distinct Peer
|
||||
|
||||
|
@ -107,13 +124,14 @@ when tracingEnabled:
|
|||
init, writeValue, getOutput
|
||||
|
||||
proc init*[MsgName](T: type ResponderWithId[MsgName],
|
||||
peer: Peer, reqId: int): T =
|
||||
peer: Peer, reqId: uint): T =
|
||||
T(peer: peer, reqId: reqId)
|
||||
|
||||
proc init*[MsgName](T: type ResponderWithoutId[MsgName], peer: Peer): T =
|
||||
T(peer)
|
||||
|
||||
chronicles.formatIt(Peer): $(it.remote)
|
||||
chronicles.formatIt(Opt[uint]): (if it.isSome(): $it.value else: "-1")
|
||||
|
||||
include p2p_backends_helpers
|
||||
|
||||
|
@ -227,9 +245,9 @@ proc getDispatcher(node: EthereumNode,
|
|||
|
||||
new result
|
||||
newSeq(result.protocolOffsets, protocolCount())
|
||||
result.protocolOffsets.fill -1
|
||||
result.protocolOffsets.fill Opt.none(uint)
|
||||
|
||||
var nextUserMsgId = 0x10
|
||||
var nextUserMsgId = 0x10u
|
||||
|
||||
for localProtocol in node.protocols:
|
||||
let idx = localProtocol.index
|
||||
|
@ -237,8 +255,8 @@ proc getDispatcher(node: EthereumNode,
|
|||
for remoteCapability in otherPeerCapabilities:
|
||||
if localProtocol.name == remoteCapability.name and
|
||||
localProtocol.version == remoteCapability.version:
|
||||
result.protocolOffsets[idx] = nextUserMsgId
|
||||
nextUserMsgId += localProtocol.messages.len
|
||||
result.protocolOffsets[idx] = Opt.some(nextUserMsgId)
|
||||
nextUserMsgId += localProtocol.messages.len.uint
|
||||
break findMatchingProtocol
|
||||
|
||||
template copyTo(src, dest; index: int) =
|
||||
|
@ -250,14 +268,14 @@ proc getDispatcher(node: EthereumNode,
|
|||
|
||||
for localProtocol in node.protocols:
|
||||
let idx = localProtocol.index
|
||||
if result.protocolOffsets[idx] != -1:
|
||||
if result.protocolOffsets[idx].isSome:
|
||||
result.activeProtocols.add localProtocol
|
||||
localProtocol.messages.copyTo(result.messages,
|
||||
result.protocolOffsets[idx])
|
||||
result.protocolOffsets[idx].value.int)
|
||||
|
||||
proc getMsgName*(peer: Peer, msgId: int): string =
|
||||
proc getMsgName*(peer: Peer, msgId: uint): string =
|
||||
if not peer.dispatcher.isNil and
|
||||
msgId < peer.dispatcher.messages.len and
|
||||
msgId < peer.dispatcher.messages.len.uint and
|
||||
not peer.dispatcher.messages[msgId].isNil:
|
||||
return peer.dispatcher.messages[msgId].name
|
||||
else:
|
||||
|
@ -268,20 +286,20 @@ proc getMsgName*(peer: Peer, msgId: int): string =
|
|||
of 3: "pong"
|
||||
else: $msgId
|
||||
|
||||
proc getMsgMetadata*(peer: Peer, msgId: int): (ProtocolInfo, MessageInfo) =
|
||||
proc getMsgMetadata*(peer: Peer, msgId: uint): (ProtocolInfo, MessageInfo) =
|
||||
doAssert msgId >= 0
|
||||
|
||||
let dpInfo = devp2pInfo()
|
||||
if msgId <= dpInfo.messages[^1].id:
|
||||
return (dpInfo, dpInfo.messages[msgId])
|
||||
|
||||
if msgId < peer.dispatcher.messages.len:
|
||||
if msgId < peer.dispatcher.messages.len.uint:
|
||||
let numProtocol = protocolCount()
|
||||
for i in 0 ..< numProtocol:
|
||||
let protocol = getProtocol(i)
|
||||
let offset = peer.dispatcher.protocolOffsets[i]
|
||||
if offset != -1 and
|
||||
offset + protocol.messages[^1].id >= msgId:
|
||||
if offset.isSome and
|
||||
offset.value + protocol.messages[^1].id >= msgId:
|
||||
return (protocol, peer.dispatcher.messages[msgId])
|
||||
|
||||
# Protocol info objects
|
||||
|
@ -318,43 +336,45 @@ proc nextMsgResolver[MsgType](msgData: Rlp, future: FutureBase)
|
|||
MsgType.rlpFieldsCount > 1)
|
||||
|
||||
proc registerMsg(protocol: ProtocolInfo,
|
||||
id: int, name: string,
|
||||
msgId: uint,
|
||||
name: string,
|
||||
thunk: ThunkProc,
|
||||
printer: MessageContentPrinter,
|
||||
requestResolver: RequestResolver,
|
||||
nextMsgResolver: NextMsgResolver) =
|
||||
if protocol.messages.len <= id:
|
||||
protocol.messages.setLen(id + 1)
|
||||
protocol.messages[id] = MessageInfo(id: id,
|
||||
name: name,
|
||||
thunk: thunk,
|
||||
printer: printer,
|
||||
requestResolver: requestResolver,
|
||||
nextMsgResolver: nextMsgResolver)
|
||||
if protocol.messages.len.uint <= msgId:
|
||||
protocol.messages.setLen(msgId + 1)
|
||||
protocol.messages[msgId] = MessageInfo(
|
||||
id: msgId,
|
||||
name: name,
|
||||
thunk: thunk,
|
||||
printer: printer,
|
||||
requestResolver: requestResolver,
|
||||
nextMsgResolver: nextMsgResolver)
|
||||
|
||||
# Message composition and encryption
|
||||
#
|
||||
|
||||
proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: int): int =
|
||||
proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: uint): uint =
|
||||
result = msgId
|
||||
if not peer.dispatcher.isNil:
|
||||
result += peer.dispatcher.protocolOffsets[proto.index]
|
||||
result += peer.dispatcher.protocolOffsets[proto.index].value
|
||||
|
||||
template getPeer(peer: Peer): auto = peer
|
||||
template getPeer(responder: ResponderWithId): auto = responder.peer
|
||||
template getPeer(responder: ResponderWithoutId): auto = Peer(responder)
|
||||
|
||||
proc supports*(peer: Peer, proto: ProtocolInfo): bool =
|
||||
peer.dispatcher.protocolOffsets[proto.index] != -1
|
||||
peer.dispatcher.protocolOffsets[proto.index].isSome
|
||||
|
||||
proc supports*(peer: Peer, Protocol: type): bool =
|
||||
## Checks whether a Peer supports a particular protocol
|
||||
peer.supports(Protocol.protocolInfo)
|
||||
|
||||
template perPeerMsgId(peer: Peer, MsgType: type): int =
|
||||
template perPeerMsgId(peer: Peer, MsgType: type): uint =
|
||||
perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId)
|
||||
|
||||
proc invokeThunk*(peer: Peer, msgId: int, msgData: Rlp): Future[void]
|
||||
proc invokeThunk*(peer: Peer, msgId: uint, msgData: Rlp): Future[void]
|
||||
{.async: (raises: [rlp.RlpError, EthP2PError]).} =
|
||||
template invalidIdError: untyped =
|
||||
raise newException(UnsupportedMessageError,
|
||||
|
@ -362,7 +382,7 @@ proc invokeThunk*(peer: Peer, msgId: int, msgData: Rlp): Future[void]
|
|||
" on a connection supporting " & peer.dispatcher.describeProtocols)
|
||||
|
||||
# msgId can be negative as it has int as type and gets decoded from rlp
|
||||
if msgId >= peer.dispatcher.messages.len or msgId < 0: invalidIdError()
|
||||
if msgId >= peer.dispatcher.messages.len.uint: invalidIdError()
|
||||
if peer.dispatcher.messages[msgId].isNil: invalidIdError()
|
||||
|
||||
let thunk = peer.dispatcher.messages[msgId].thunk
|
||||
|
@ -401,9 +421,9 @@ proc send*[Msg](peer: Peer, msg: Msg): Future[void] =
|
|||
proc registerRequest(peer: Peer,
|
||||
timeout: Duration,
|
||||
responseFuture: FutureBase,
|
||||
responseMsgId: int): int =
|
||||
inc peer.lastReqId
|
||||
result = peer.lastReqId
|
||||
responseMsgId: uint): uint =
|
||||
result = if peer.lastReqId.isNone: 0u else: peer.lastReqId.value + 1u
|
||||
peer.lastReqId = Opt.some(result)
|
||||
|
||||
let timeoutAt = Moment.fromNow(timeout)
|
||||
let req = OutstandingRequest(id: result,
|
||||
|
@ -418,11 +438,15 @@ proc registerRequest(peer: Peer,
|
|||
|
||||
discard setTimer(timeoutAt, timeoutExpired, nil)
|
||||
|
||||
proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
|
||||
proc resolveResponseFuture(peer: Peer, msgId: uint, msg: pointer) =
|
||||
## Split off from the non request ID version from the originally combined
|
||||
## `resolveResponseFuture()` function. This seems cleaner to handle with macros
|
||||
## than a `int` or `Opt[uint]` request ID argument (yes, there is a second part
|
||||
## below.).
|
||||
logScope:
|
||||
msg = peer.dispatcher.messages[msgId].name
|
||||
msgContents = peer.dispatcher.messages[msgId].printer(msg)
|
||||
receivedReqId = reqId
|
||||
receivedReqId = -1
|
||||
remotePeer = peer.remote
|
||||
|
||||
template resolve(future) =
|
||||
|
@ -431,7 +455,7 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
|
|||
template outstandingReqs: auto =
|
||||
peer.outstandingRequests[msgId]
|
||||
|
||||
if reqId == -1:
|
||||
block: # no request ID
|
||||
# XXX: This is a response from an ETH-like protocol that doesn't feature
|
||||
# request IDs. Handling the response is quite tricky here because this may
|
||||
# be a late response to an already timed out request or a valid response
|
||||
|
@ -455,7 +479,22 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
|
|||
resolve oldestReq.future
|
||||
else:
|
||||
trace "late or duplicate reply for a RLPx request"
|
||||
else:
|
||||
|
||||
proc resolveResponseFuture(peer: Peer, msgId: uint, msg: pointer, reqId: uint) =
|
||||
## Variant of `resolveResponseFuture()` for request ID argument.
|
||||
logScope:
|
||||
msg = peer.dispatcher.messages[msgId].name
|
||||
msgContents = peer.dispatcher.messages[msgId].printer(msg)
|
||||
receivedReqId = reqId
|
||||
remotePeer = peer.remote
|
||||
|
||||
template resolve(future) =
|
||||
(peer.dispatcher.messages[msgId].requestResolver)(msg, future)
|
||||
|
||||
template outstandingReqs: auto =
|
||||
peer.outstandingRequests[msgId]
|
||||
|
||||
block: # have request ID
|
||||
# TODO: This is not completely sound because we are still using a global
|
||||
# `reqId` sequence (the problem is that we might get a response ID that
|
||||
# matches a request ID for a different type of request). To make the code
|
||||
|
@ -464,7 +503,7 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
|
|||
# correctly (because then, we'll be reusing the same reqIds for different
|
||||
# types of requests). Alternatively, we can assign a separate interval in
|
||||
# the `reqId` space for each type of response.
|
||||
if reqId > peer.lastReqId:
|
||||
if peer.lastReqId.isNone or reqId > peer.lastReqId.value:
|
||||
warn "RLPx response without a matching request"
|
||||
return
|
||||
|
||||
|
@ -500,7 +539,7 @@ proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
|
|||
debug "late or duplicate reply for a RLPx request"
|
||||
|
||||
|
||||
proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
|
||||
proc recvMsg*(peer: Peer): Future[tuple[msgId: uint, msgData: Rlp]] {.async.} =
|
||||
## This procs awaits the next complete RLPx message in the TCP stream
|
||||
|
||||
var headerBytes: array[32, byte]
|
||||
|
@ -562,11 +601,11 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
|
|||
|
||||
var rlp = rlpFromBytes(decryptedBytes)
|
||||
|
||||
var msgId: int32
|
||||
var msgId: uint32
|
||||
try:
|
||||
# int32 as this seems more than big enough for the amount of msgIds
|
||||
msgId = rlp.read(int32)
|
||||
result = (msgId.int, rlp)
|
||||
# uint32 as this seems more than big enough for the amount of msgIds
|
||||
msgId = rlp.read(uint32)
|
||||
result = (msgId.uint, rlp)
|
||||
except RlpError:
|
||||
await peer.disconnectAndRaise(BreachOfProtocol,
|
||||
"Cannot read RLPx message id")
|
||||
|
@ -635,7 +674,7 @@ proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
|
|||
# message handler code as the TODO mentions already.
|
||||
proc dispatchMessages*(peer: Peer) {.async.} =
|
||||
while peer.connectionState notin {Disconnecting, Disconnected}:
|
||||
var msgId: int
|
||||
var msgId: uint
|
||||
var msgData: Rlp
|
||||
try:
|
||||
(msgId, msgData) = await peer.recvMsg()
|
||||
|
@ -676,7 +715,7 @@ proc dispatchMessages*(peer: Peer) {.async.} =
|
|||
# The documentation will need to be updated, explaining the fact that
|
||||
# nextMsg will be resolved only if the message handler has executed
|
||||
# successfully.
|
||||
if msgId >= 0 and msgId < peer.awaitedMessages.len and
|
||||
if msgId < peer.awaitedMessages.len.uint and
|
||||
peer.awaitedMessages[msgId] != nil:
|
||||
let msgInfo = peer.dispatcher.messages[msgId]
|
||||
try:
|
||||
|
@ -738,12 +777,15 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
|
|||
else: ResponderWithoutId
|
||||
|
||||
result.implementMsg = proc (msg: Message) =
|
||||
# FIXME: Or is it already assured that `msgId` is available?
|
||||
doAssert msg.id.isSome
|
||||
|
||||
var
|
||||
msgId = msg.id
|
||||
msgIdValue = msg.id.value
|
||||
msgIdent = msg.ident
|
||||
msgName = $msgIdent
|
||||
msgRecName = msg.recName
|
||||
responseMsgId = if msg.response != nil: msg.response.id else: -1
|
||||
responseMsgId = if msg.response.isNil: Opt.none(uint) else: msg.response.id
|
||||
hasReqId = msg.hasReqId
|
||||
protocol = msg.protocol
|
||||
|
||||
|
@ -764,11 +806,13 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
|
|||
if hasReqId:
|
||||
# Messages using request Ids
|
||||
readParams.add quote do:
|
||||
let `reqIdVar` = `read`(`receivedRlp`, int)
|
||||
let `reqIdVar` = `read`(`receivedRlp`, uint)
|
||||
|
||||
case msg.kind
|
||||
of msgRequest:
|
||||
let reqToResponseOffset = responseMsgId - msgId
|
||||
doAssert responseMsgId.isSome
|
||||
|
||||
let reqToResponseOffset = responseMsgId.value - msgIdValue
|
||||
let responseMsgId = quote do: `perPeerMsgIdVar` + `reqToResponseOffset`
|
||||
|
||||
# Each request is registered so we can resolve it when the response
|
||||
|
@ -814,14 +858,20 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
|
|||
when tracingEnabled:
|
||||
readParams.add newCall(bindSym"logReceivedMsg", peerVar, receivedMsg)
|
||||
|
||||
let callResolvedResponseFuture = if msg.kind == msgResponse:
|
||||
newCall(resolveResponseFuture,
|
||||
peerVar,
|
||||
newCall(perPeerMsgId, peerVar, msgRecName),
|
||||
newCall("addr", receivedMsg),
|
||||
if hasReqId: reqIdVar else: newLit(-1))
|
||||
else:
|
||||
newStmtList()
|
||||
let callResolvedResponseFuture =
|
||||
if msg.kind != msgResponse:
|
||||
newStmtList()
|
||||
elif hasReqId:
|
||||
newCall(resolveResponseFuture,
|
||||
peerVar,
|
||||
newCall(perPeerMsgId, peerVar, msgRecName),
|
||||
newCall("addr", receivedMsg),
|
||||
reqIdVar)
|
||||
else:
|
||||
newCall(resolveResponseFuture,
|
||||
peerVar,
|
||||
newCall(perPeerMsgId, peerVar, msgRecName),
|
||||
newCall("addr", receivedMsg))
|
||||
|
||||
var userHandlerParams = @[peerVar]
|
||||
if hasReqId: userHandlerParams.add reqIdVar
|
||||
|
@ -831,7 +881,7 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
|
|||
thunkName = ident(msgName & "Thunk")
|
||||
|
||||
msg.defineThunk quote do:
|
||||
proc `thunkName`(`peerVar`: `Peer`, _: int, data: Rlp)
|
||||
proc `thunkName`(`peerVar`: `Peer`, _: uint, data: Rlp)
|
||||
# Fun error if you just use `RlpError` instead of `rlp.RlpError`:
|
||||
# "Error: type expected, but got symbol 'RlpError' of kind 'EnumField'"
|
||||
{.async: (raises: [rlp.RlpError, EthP2PError]).} =
|
||||
|
@ -864,9 +914,9 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
|
|||
quote: return `sendCall`
|
||||
|
||||
let perPeerMsgIdValue = if isSubprotocol:
|
||||
newCall(perPeerMsgIdImpl, peerVar, protocol.protocolInfo, newLit(msgId))
|
||||
newCall(perPeerMsgIdImpl, peerVar, protocol.protocolInfo, newLit(msgIdValue))
|
||||
else:
|
||||
newLit(msgId)
|
||||
newLit(msgIdValue)
|
||||
|
||||
if paramCount > 1:
|
||||
# In case there are more than 1 parameter,
|
||||
|
@ -880,9 +930,8 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
|
|||
|
||||
let initWriter = quote do:
|
||||
var `rlpWriter` = `initRlpWriter`()
|
||||
const `perProtocolMsgIdVar` {.used.} = `msgId`
|
||||
const `perProtocolMsgIdVar` {.used.} = `msgIdValue`
|
||||
let `perPeerMsgIdVar` = `perPeerMsgIdValue`
|
||||
# TODO: rlpx should error if perPeerMsgIdVar is signed
|
||||
`append`(`rlpWriter`, `perPeerMsgIdVar`)
|
||||
|
||||
when tracingEnabled:
|
||||
|
@ -902,7 +951,7 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
|
|||
protocol.outProcRegistrations.add(
|
||||
newCall(registerMsg,
|
||||
protocolVar,
|
||||
newLit(msgId),
|
||||
newLit(msgIdValue),
|
||||
newLit(msgName),
|
||||
thunkName,
|
||||
newTree(nnkBracketExpr, messagePrinter, msgRecName),
|
||||
|
@ -1026,7 +1075,7 @@ proc initPeerState*(peer: Peer, capabilities: openArray[Capability])
|
|||
# Similarly, we need a bit of book-keeping data to keep track
|
||||
# of the potentially concurrent calls to `nextMsg`.
|
||||
peer.awaitedMessages.newSeq(peer.dispatcher.messages.len)
|
||||
peer.lastReqId = 0
|
||||
peer.lastReqId = Opt.some(0u)
|
||||
peer.initProtocolStates peer.dispatcher.activeProtocols
|
||||
|
||||
proc postHelloSteps(peer: Peer, h: DevP2P.hello) {.async.} =
|
||||
|
@ -1416,10 +1465,10 @@ when isMainModule:
|
|||
# are considered GcSafe. The short answer is that they aren't, because
|
||||
# they dispatch into user code that might use the GC.
|
||||
type
|
||||
GcSafeDispatchMsg = proc (peer: Peer, msgId: int, msgData: var Rlp)
|
||||
GcSafeDispatchMsg = proc (peer: Peer, msgId: uint, msgData: var Rlp)
|
||||
|
||||
GcSafeRecvMsg = proc (peer: Peer):
|
||||
Future[tuple[msgId: int, msgData: Rlp]] {.gcsafe.}
|
||||
Future[tuple[msgId: uint, msgData: Rlp]] {.gcsafe.}
|
||||
|
||||
GcSafeAccept = proc (transport: StreamTransport, myKeys: KeyPair):
|
||||
Future[Peer] {.gcsafe.}
|
||||
|
|
|
@ -29,6 +29,6 @@ test:
|
|||
# because of undeterministic behaviour due to usage of network/async.
|
||||
try:
|
||||
var (msgId, msgData) = recvMsgMock(payload)
|
||||
waitFor peer.invokeThunk(msgId.int, msgData)
|
||||
waitFor peer.invokeThunk(msgId, msgData)
|
||||
except CatchableError as e:
|
||||
debug "Test caused CatchableError", exception=e.name, trace=e.repr, msg=e.msg
|
||||
|
|
|
@ -1,3 +1,13 @@
|
|||
# nim-eth
|
||||
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except
|
||||
# according to those terms.
|
||||
|
||||
import
|
||||
std/strutils,
|
||||
chronos,
|
||||
|
@ -28,8 +38,8 @@ proc setupTestNode*(
|
|||
|
||||
template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||
|
||||
proc recvMsgMock*(msg: openArray[byte]): tuple[msgId: int, msgData: Rlp] =
|
||||
proc recvMsgMock*(msg: openArray[byte]): tuple[msgId: uint, msgData: Rlp] =
|
||||
var rlp = rlpFromBytes(msg)
|
||||
|
||||
let msgId = rlp.read(int32)
|
||||
return (msgId.int, rlp)
|
||||
let msgId = rlp.read(uint32)
|
||||
return (msgId.uint, rlp)
|
||||
|
|
|
@ -9,14 +9,14 @@
|
|||
"error": "UnsupportedMessageError",
|
||||
"description": "This is a message id not used by devp2p or eth"
|
||||
},
|
||||
"Message id that is bigger than int32": {
|
||||
"Message id that is bigger than uint32": {
|
||||
"payload": "888888888888888888",
|
||||
"error": "UnsupportedRlpError",
|
||||
"description": "This payload will result in too large int for a message id"
|
||||
},
|
||||
"Message id that is negative": {
|
||||
"payload": "8488888888",
|
||||
"error": "UnsupportedRlpError",
|
||||
"Unsupported big message id": {
|
||||
"payload": "848888888888",
|
||||
"error": "UnsupportedMessageError",
|
||||
"description": "This payload will result in a negative number as message id"
|
||||
},
|
||||
"No Hash nor Status, but empty list": {
|
||||
|
|
|
@ -1,3 +1,13 @@
|
|||
# nim-eth
|
||||
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at
|
||||
# https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except
|
||||
# according to those terms.
|
||||
|
||||
{.used.}
|
||||
|
||||
import
|
||||
|
@ -23,7 +33,7 @@ let peer = res.get()
|
|||
|
||||
proc testThunk(payload: openArray[byte]) =
|
||||
var (msgId, msgData) = recvMsgMock(payload)
|
||||
waitFor peer.invokeThunk(msgId.int, msgData)
|
||||
waitFor peer.invokeThunk(msgId, msgData)
|
||||
|
||||
proc testPayloads(filename: string) =
|
||||
let js = json.parseFile(filename)
|
||||
|
|
Loading…
Reference in New Issue