Use asyncraises in p2p (#675)

* Use asyncraises in p2p

* Fix capture error in async proc

* Remove gcsafe from async procs
This commit is contained in:
andri lim 2024-02-14 15:59:13 +07:00 committed by GitHub
parent ce834e1287
commit b9c40e1380
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 46 additions and 42 deletions

View File

@ -122,14 +122,17 @@ proc newEthereumNode*(
result.addCapability(cap) result.addCapability(cap)
proc processIncoming(server: StreamServer, proc processIncoming(server: StreamServer,
remote: StreamTransport): Future[void] {.async, gcsafe.} = remote: StreamTransport): Future[void] {.async: (raises: []).} =
var node = getUserData[EthereumNode](server) try:
let peer = await node.rlpxAccept(remote) var node = getUserData[EthereumNode](server)
if not peer.isNil: let peer = await node.rlpxAccept(remote)
trace "Connection established (incoming)", peer if not peer.isNil:
if node.peerPool != nil: trace "Connection established (incoming)", peer
node.peerPool.connectingNodes.excl(peer.remote) if node.peerPool != nil:
node.peerPool.addPeer(peer) node.peerPool.connectingNodes.excl(peer.remote)
node.peerPool.addPeer(peer)
except CatchableError as exc:
error "processIncoming", msg=exc.msg
proc listeningAddress*(node: EthereumNode): ENode = proc listeningAddress*(node: EthereumNode): ENode =
node.toENode() node.toENode()

View File

@ -120,8 +120,9 @@ proc timeKeyPong(n: Node): TimeKey =
proc timeKeyPing(n: Node): TimeKey = proc timeKeyPing(n: Node): TimeKey =
timeKey(n.id, n.ip, cmdPing) timeKey(n.id, n.ip, cmdPing)
proc lastPingReceived(k: KademliaProtocol, n: Node): Time = when false:
k.pingPongTime.getOrDefault(n.timeKeyPing, 0'i64).fromUnix proc lastPingReceived(k: KademliaProtocol, n: Node): Time =
k.pingPongTime.getOrDefault(n.timeKeyPing, 0'i64).fromUnix
proc lastPongReceived(k: KademliaProtocol, n: Node): Time = proc lastPongReceived(k: KademliaProtocol, n: Node): Time =
k.pingPongTime.getOrDefault(n.timeKeyPong, 0'i64).fromUnix k.pingPongTime.getOrDefault(n.timeKeyPong, 0'i64).fromUnix
@ -182,9 +183,10 @@ proc updateLastPongReceived(k: KademliaProtocol, n: Node, t: Time) =
k.removeTooOldPingPongTime() k.removeTooOldPingPongTime()
k.pingPongTime[n.timeKeyPong] = t.toUnix k.pingPongTime[n.timeKeyPong] = t.toUnix
# checkBond checks if the given node has a recent enough endpoint proof. when false:
proc checkBond(k: KademliaProtocol, n: Node): bool = # checkBond checks if the given node has a recent enough endpoint proof.
getTime() - k.lastPongReceived(n) < BOND_EXPIRATION proc checkBond(k: KademliaProtocol, n: Node): bool =
getTime() - k.lastPongReceived(n) < BOND_EXPIRATION
proc newKBucket(istart, iend: NodeId): KBucket = proc newKBucket(istart, iend: NodeId): KBucket =
result.new() result.new()

View File

@ -1,3 +1,14 @@
# nim-eth
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
let protocolManager = ProtocolManager() let protocolManager = ProtocolManager()
# The variables above are immutable RTTI information. We need to tell # The variables above are immutable RTTI information. We need to tell
@ -64,3 +75,5 @@ proc initProtocolStates(peer: Peer, protocols: openArray[ProtocolInfo])
if peerStateInit != nil: if peerStateInit != nil:
peer.protocolStates[protocol.index] = peerStateInit(peer) peer.protocolStates[protocol.index] = peerStateInit(peer)
{.pop.}

View File

@ -149,7 +149,6 @@ let
const const
protocolCounter = CacheCounter"protocolCounter" protocolCounter = CacheCounter"protocolCounter"
template Opt(T): auto = newTree(nnkBracketExpr, Option, T)
template Fut(T): auto = newTree(nnkBracketExpr, Future, T) template Fut(T): auto = newTree(nnkBracketExpr, Future, T)
proc initFuture*[T](loc: var Future[T]) = proc initFuture*[T](loc: var Future[T]) =
@ -330,10 +329,6 @@ proc init*(T: type P2PProtocol, backendFactory: BackendFactory,
if not result.backend.afterProtocolInit.isNil: if not result.backend.afterProtocolInit.isNil:
result.backend.afterProtocolInit(result) result.backend.afterProtocolInit(result)
proc isFuture(t: NimNode): bool =
t.kind == nnkBracketExpr and eqIdent(t[0], "Future")
proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = -1) = proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = -1) =
## This procs adds a set of common helpers available in all messages handlers ## This procs adds a set of common helpers available in all messages handlers
## (e.g. `perProtocolMsgId`, `peer.state`, etc). ## (e.g. `perProtocolMsgId`, `peer.state`, etc).
@ -362,22 +357,22 @@ proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = -1) =
param[^2] = chooseFieldType(param[^2]) param[^2] = chooseFieldType(param[^2])
prelude.add quote do: prelude.add quote do:
type `currentProtocolSym` = `protocolNameIdent` type `currentProtocolSym` {.used.} = `protocolNameIdent`
if msgId >= 0 and p.isRlpx: if msgId >= 0 and p.isRlpx:
prelude.add quote do: prelude.add quote do:
const `perProtocolMsgIdVar` = `msgId` const `perProtocolMsgIdVar` {.used.} = `msgId`
# Define local accessors for the peer and the network protocol states # Define local accessors for the peer and the network protocol states
# inside each user message handler proc (e.g. peer.state.foo = bar) # inside each user message handler proc (e.g. peer.state.foo = bar)
if PeerStateType != nil: if PeerStateType != nil:
prelude.add quote do: prelude.add quote do:
template state(`peerVar`: `PeerType`): `PeerStateType` = template state(`peerVar`: `PeerType`): `PeerStateType` {.used.} =
`PeerStateType`(`getState`(`peerVar`, `protocolInfo`)) `PeerStateType`(`getState`(`peerVar`, `protocolInfo`))
if NetworkStateType != nil: if NetworkStateType != nil:
prelude.add quote do: prelude.add quote do:
template networkState(`peerVar`: `PeerType`): `NetworkStateType` = template networkState(`peerVar`: `PeerType`): `NetworkStateType` {.used.} =
`NetworkStateType`(`getNetworkState`(`peerVar`.network, `protocolInfo`)) `NetworkStateType`(`getNetworkState`(`peerVar`.network, `protocolInfo`))
proc addPreludeDefs*(userHandlerProc: NimNode, definitions: NimNode) = proc addPreludeDefs*(userHandlerProc: NimNode, definitions: NimNode) =
@ -395,7 +390,6 @@ proc addTimeoutParam(procDef: NimNode, defaultValue: int64) =
var var
Duration = bindSym"Duration" Duration = bindSym"Duration"
milliseconds = bindSym"milliseconds" milliseconds = bindSym"milliseconds"
lastParam = procDef.params[^1]
procDef.params.add newTree(nnkIdentDefs, procDef.params.add newTree(nnkIdentDefs,
timeoutVar, timeoutVar,
@ -696,8 +690,6 @@ proc useStandardBody*(sendProc: SendProc,
postSerialization = if postSerializationStep.isNil: newStmtList() postSerialization = if postSerializationStep.isNil: newStmtList()
else: postSerializationStep(outputStream) else: postSerializationStep(outputStream)
appendParams = newStmtList()
tracing = when not tracingEnabled: tracing = when not tracingEnabled:
newStmtList() newStmtList()
else: else:
@ -979,7 +971,7 @@ proc genCode*(p: P2PProtocol): NimNode =
regBody.add newCall(p.backend.registerProtocol, protocolVar) regBody.add newCall(p.backend.registerProtocol, protocolVar)
result.add quote do: result.add quote do:
proc `protocolReg`() {.raises: [RlpError, Defect].} = proc `protocolReg`() {.raises: [RlpError].} =
let `protocolVar` = `protocolInit` let `protocolVar` = `protocolInit`
`regBody` `regBody`
`protocolReg`() `protocolReg`()

View File

@ -155,7 +155,7 @@ type
MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode
ThunkProc* = proc(x: Peer, msgId: int, data: Rlp): Future[void] ThunkProc* = proc(x: Peer, msgId: int, data: Rlp): Future[void]
{.gcsafe, raises: [RlpError].} {.gcsafe, async: (raises: [RlpError, CatchableError]).}
MessageContentPrinter* = proc(msg: pointer): string MessageContentPrinter* = proc(msg: pointer): string
{.gcsafe, raises: [].} {.gcsafe, raises: [].}

View File

@ -176,7 +176,7 @@ proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} =
# result = $(cast[ptr MsgType](msg)[]) # result = $(cast[ptr MsgType](msg)[])
proc disconnect*(peer: Peer, reason: DisconnectionReason, proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.gcsafe, async.} notifyOtherPeer = false) {.async: (raises:[CatchableError]).}
template raisePeerDisconnected(msg: string, r: DisconnectionReason) = template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg) var e = newException(PeerDisconnected, msg)
@ -215,9 +215,6 @@ proc handshakeImpl[T](peer: Peer,
# Dispatcher # Dispatcher
# #
proc hash(d: Dispatcher): int =
hash(d.protocolOffsets)
proc `==`(lhs, rhs: Dispatcher): bool = proc `==`(lhs, rhs: Dispatcher): bool =
lhs.activeProtocols == rhs.activeProtocols lhs.activeProtocols == rhs.activeProtocols
@ -365,8 +362,8 @@ proc supports*(peer: Peer, Protocol: type): bool =
template perPeerMsgId(peer: Peer, MsgType: type): int = template perPeerMsgId(peer: Peer, MsgType: type): int =
perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId) perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId)
proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] proc invokeThunk*(peer: Peer, msgId: int, msgData: Rlp): Future[void]
{.raises: [UnsupportedMessageError, RlpError].} = {.async: (raises: [CatchableError, rlp.RlpError]).} =
template invalidIdError: untyped = template invalidIdError: untyped =
raise newException(UnsupportedMessageError, raise newException(UnsupportedMessageError,
"RLPx message with an invalid id " & $msgId & "RLPx message with an invalid id " & $msgId &
@ -379,7 +376,7 @@ proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void]
let thunk = peer.dispatcher.messages[msgId].thunk let thunk = peer.dispatcher.messages[msgId].thunk
if thunk == nil: invalidIdError() if thunk == nil: invalidIdError()
return thunk(peer, msgId, msgData) await thunk(peer, msgId, msgData)
template compressMsg(peer: Peer, data: seq[byte]): seq[byte] = template compressMsg(peer: Peer, data: seq[byte]): seq[byte] =
when useSnappy: when useSnappy:
@ -389,7 +386,7 @@ template compressMsg(peer: Peer, data: seq[byte]): seq[byte] =
else: else:
data data
proc sendMsg*(peer: Peer, data: seq[byte]) {.gcsafe, async.} = proc sendMsg*(peer: Peer, data: seq[byte]) {.async.} =
var cipherText = encryptMsg(peer.compressMsg(data), peer.secretsState) var cipherText = encryptMsg(peer.compressMsg(data), peer.secretsState)
try: try:
var res = await peer.transport.write(cipherText) var res = await peer.transport.write(cipherText)
@ -861,10 +858,8 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
msgName = $msgIdent msgName = $msgIdent
msgRecName = msg.recName msgRecName = msg.recName
responseMsgId = if msg.response != nil: msg.response.id else: -1 responseMsgId = if msg.response != nil: msg.response.id else: -1
ResponseRecord = if msg.response != nil: msg.response.recName else: nil
hasReqId = msg.hasReqId hasReqId = msg.hasReqId
protocol = msg.protocol protocol = msg.protocol
userPragmas = msg.procDef.pragma
# variables used in the sending procs # variables used in the sending procs
peerOrResponder = ident"peerOrResponder" peerOrResponder = ident"peerOrResponder"
@ -922,7 +917,6 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
# The received RLP data is deserialized to a local variable of # The received RLP data is deserialized to a local variable of
# the message-specific type. This is done field by field here: # the message-specific type. This is done field by field here:
let msgNameLit = newLit(msgName)
readParams.add quote do: readParams.add quote do:
`receivedMsg`.`param` = `checkedRlpRead`(`peerVar`, `receivedRlp`, `paramType`) `receivedMsg`.`param` = `checkedRlpRead`(`peerVar`, `receivedRlp`, `paramType`)
@ -954,7 +948,7 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
proc `thunkName`(`peerVar`: `Peer`, _: int, data: Rlp) proc `thunkName`(`peerVar`: `Peer`, _: int, data: Rlp)
# Fun error if you just use `RlpError` instead of `rlp.RlpError`: # Fun error if you just use `RlpError` instead of `rlp.RlpError`:
# "Error: type expected, but got symbol 'RlpError' of kind 'EnumField'" # "Error: type expected, but got symbol 'RlpError' of kind 'EnumField'"
{.async, gcsafe, raises: [rlp.RlpError].} = {.async: (raises: [rlp.RlpError, CatchableError]).} =
var `receivedRlp` = data var `receivedRlp` = data
var `receivedMsg` {.noinit.}: `msgRecName` var `receivedMsg` {.noinit.}: `msgRecName`
`readParamsPrelude` `readParamsPrelude`
@ -1000,7 +994,7 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
let initWriter = quote do: let initWriter = quote do:
var `rlpWriter` = `initRlpWriter`() var `rlpWriter` = `initRlpWriter`()
const `perProtocolMsgIdVar` = `msgId` const `perProtocolMsgIdVar` {.used.} = `msgId`
let `perPeerMsgIdVar` = `perPeerMsgIdValue` let `perPeerMsgIdVar` = `perPeerMsgIdValue`
`append`(`rlpWriter`, `perPeerMsgIdVar`) `append`(`rlpWriter`, `perPeerMsgIdVar`)
@ -1088,7 +1082,7 @@ proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason):
trace "Disconnection handler ended with an error", err = f.error.msg trace "Disconnection handler ended with an error", err = f.error.msg
proc disconnect*(peer: Peer, reason: DisconnectionReason, proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async.} = notifyOtherPeer = false) {.async: (raises: [CatchableError]).} =
if peer.connectionState notin {Disconnecting, Disconnected}: if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting peer.connectionState = Disconnecting
# Do this first so sub-protocols have time to clean up and stop sending # Do this first so sub-protocols have time to clean up and stop sending
@ -1391,7 +1385,7 @@ proc rlpxConnect*(node: EthereumNode, remote: Node):
# TODO: rework rlpxAccept similar to rlpxConnect. # TODO: rework rlpxAccept similar to rlpxConnect.
proc rlpxAccept*( proc rlpxAccept*(
node: EthereumNode, transport: StreamTransport): Future[Peer] {.async.} = node: EthereumNode, transport: StreamTransport): Future[Peer] {.async: (raises: [CatchableError]).} =
initTracing(devp2pInfo, node.protocols) initTracing(devp2pInfo, node.protocols)
let peer = Peer(transport: transport, network: node) let peer = Peer(transport: transport, network: node)