mirror of https://github.com/status-im/nim-eth.git
Refactored the p2pProtocol macro to eliminate most code duplication in the backends
This commit is contained in:
parent
47c7a9887e
commit
f761889518
|
@ -1,4 +1,4 @@
|
|||
proc getState(peer: Peer, proto: ProtocolInfo): RootRef =
|
||||
proc getState*(peer: Peer, proto: ProtocolInfo): RootRef =
|
||||
peer.protocolStates[proto.index]
|
||||
|
||||
template state*(peer: Peer, Protocol: type): untyped =
|
||||
|
@ -8,7 +8,7 @@ template state*(peer: Peer, Protocol: type): untyped =
|
|||
bind getState
|
||||
cast[Protocol.State](getState(peer, Protocol.protocolInfo))
|
||||
|
||||
proc getNetworkState(node: EthereumNode, proto: ProtocolInfo): RootRef =
|
||||
proc getNetworkState*(node: EthereumNode, proto: ProtocolInfo): RootRef =
|
||||
node.protocolStates[proto.index]
|
||||
|
||||
template protocolState*(node: EthereumNode, Protocol: type): untyped =
|
||||
|
@ -23,44 +23,6 @@ template networkState*(connection: Peer, Protocol: type): untyped =
|
|||
|
||||
proc initProtocolState*[T](state: T, x: Peer|EthereumNode) {.gcsafe.} = discard
|
||||
|
||||
proc createPeerState[ProtocolState](peer: Peer): RootRef =
|
||||
var res = new ProtocolState
|
||||
mixin initProtocolState
|
||||
initProtocolState(res, peer)
|
||||
return cast[RootRef](res)
|
||||
|
||||
proc createNetworkState[NetworkState](network: EthereumNode): RootRef {.gcsafe.} =
|
||||
var res = new NetworkState
|
||||
mixin initProtocolState
|
||||
initProtocolState(res, network)
|
||||
return cast[RootRef](res)
|
||||
|
||||
proc chooseFieldType(n: NimNode): NimNode =
|
||||
## Examines the parameter types used in the message signature
|
||||
## and selects the corresponding field type for use in the
|
||||
## message object type (i.e. `p2p.hello`).
|
||||
##
|
||||
## For now, only openarray types are remapped to sequences.
|
||||
result = n
|
||||
if n.kind == nnkBracketExpr and eqIdent(n[0], "openarray"):
|
||||
result = n.copyNimTree
|
||||
result[0] = ident("seq")
|
||||
|
||||
proc popTimeoutParam(n: NimNode): NimNode =
|
||||
var lastParam = n.params[^1]
|
||||
if eqIdent(lastParam[0], "timeout"):
|
||||
if lastParam[2].kind == nnkEmpty:
|
||||
macros.error "You must specify a default value for the `timeout` parameter", lastParam
|
||||
result = lastParam
|
||||
n.params.del(n.params.len - 1)
|
||||
|
||||
proc verifyStateType(t: NimNode): NimNode =
|
||||
result = t[1]
|
||||
if result.kind == nnkSym and $result == "nil":
|
||||
return nil
|
||||
if result.kind != nnkBracketExpr or $result[0] != "ref":
|
||||
macros.error($result & " must be a ref type")
|
||||
|
||||
proc initFuture[T](loc: var Future[T]) =
|
||||
loc = newFuture[T]()
|
||||
|
||||
|
|
|
@ -0,0 +1,554 @@
|
|||
import
|
||||
macros,
|
||||
std_shims/macros_shim, chronos/timer
|
||||
|
||||
type
|
||||
MessageKind* = enum
|
||||
msgHandshake
|
||||
msgNotification
|
||||
msgRequest
|
||||
msgResponse
|
||||
|
||||
Message* = ref object
|
||||
id*: int
|
||||
ident*: NimNode
|
||||
kind*: MessageKind
|
||||
procDef*: NimNode
|
||||
timeoutParam*: NimNode
|
||||
recIdent*: NimNode
|
||||
recBody*: NimNode
|
||||
userHandler*: NimNode
|
||||
|
||||
Request* = ref object
|
||||
queries*: seq[Message]
|
||||
response*: Message
|
||||
|
||||
P2PProtocol* = ref object
|
||||
# Settings
|
||||
name*: string
|
||||
version*: int
|
||||
timeouts*: int64
|
||||
useRequestIds*: bool
|
||||
shortName*: string
|
||||
outgoingRequestDecorator*: NimNode
|
||||
incomingRequestDecorator*: NimNode
|
||||
incomingRequestThunkDecorator*: NimNode
|
||||
incomingResponseDecorator*: NimNode
|
||||
incomingResponseThunkDecorator*: NimNode
|
||||
PeerStateType*: NimNode
|
||||
NetworkStateType*: NimNode
|
||||
backend*: Backend
|
||||
|
||||
# Cached properties
|
||||
nameIdent*: NimNode
|
||||
protocolInfoVar*: NimNode
|
||||
|
||||
# All messages
|
||||
messages*: seq[Message]
|
||||
|
||||
# Messages by type:
|
||||
handshake*: Message
|
||||
notifications*: seq[Message]
|
||||
requests*: seq[Request]
|
||||
|
||||
# Output procs
|
||||
outSendProcs*: NimNode
|
||||
outRecvProcs*: NimNode
|
||||
outProcRegistrations*: NimNode
|
||||
|
||||
# Event handlers
|
||||
onPeerConnected*: NimNode
|
||||
onPeerDisconnected*: NimNode
|
||||
|
||||
Backend* = ref object
|
||||
# Code generators
|
||||
implementMsg*: proc (p: P2PProtocol, msg: Message, resp: Message = nil)
|
||||
implementProtocolInit*: proc (p: P2PProtocol): NimNode
|
||||
afterProtocolInit*: proc (p: P2PProtocol)
|
||||
|
||||
# Bound symbols to the back-end run-time types and procs
|
||||
PeerType*: NimNode
|
||||
NetworkType*: NimNode
|
||||
|
||||
registerProtocol*: NimNode
|
||||
setEventHandlers*: NimNode
|
||||
|
||||
BackendFactory* = proc (p: P2PProtocol): Backend
|
||||
|
||||
const
|
||||
defaultReqTimeout = 10.seconds
|
||||
|
||||
proc createPeerState[Peer, ProtocolState](peer: Peer): RootRef =
|
||||
var res = new ProtocolState
|
||||
mixin initProtocolState
|
||||
initProtocolState(res, peer)
|
||||
return cast[RootRef](res)
|
||||
|
||||
proc createNetworkState[NetworkNode, NetworkState](network: NetworkNode): RootRef {.gcsafe.} =
|
||||
var res = new NetworkState
|
||||
mixin initProtocolState
|
||||
initProtocolState(res, network)
|
||||
return cast[RootRef](res)
|
||||
|
||||
proc expectBlockWithProcs*(n: NimNode): seq[NimNode] =
|
||||
template helperName: auto = $n[0]
|
||||
|
||||
if n.len != 2 or n[1].kind != nnkStmtList:
|
||||
error(helperName & " expects a block", n)
|
||||
|
||||
for p in n[1]:
|
||||
if p.kind == nnkProcDef:
|
||||
result.add p
|
||||
elif p.kind == nnkCommentStmt:
|
||||
continue
|
||||
else:
|
||||
error(helperName & " expects a proc definition.", p)
|
||||
|
||||
proc nameOrNil*(procDef: NimNode): NimNode =
|
||||
if procDef != nil:
|
||||
procDef.name
|
||||
else:
|
||||
newNilLit()
|
||||
|
||||
proc chooseFieldType(n: NimNode): NimNode =
|
||||
## Examines the parameter types used in the message signature
|
||||
## and selects the corresponding field type for use in the
|
||||
## message object type (i.e. `p2p.hello`).
|
||||
##
|
||||
## For now, only openarray types are remapped to sequences.
|
||||
result = n
|
||||
if n.kind == nnkBracketExpr and eqIdent(n[0], "openarray"):
|
||||
result = n.copyNimTree
|
||||
result[0] = ident("seq")
|
||||
|
||||
proc verifyStateType(t: NimNode): NimNode =
|
||||
result = t[1]
|
||||
if result.kind == nnkSym and $result == "nil":
|
||||
return nil
|
||||
if result.kind != nnkBracketExpr or $result[0] != "ref":
|
||||
error $result & " must be a ref type"
|
||||
|
||||
proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode)
|
||||
|
||||
proc init*(T: type P2PProtocol, backendFactory: BackendFactory,
|
||||
name: string, version: int, body: NimNode,
|
||||
timeouts: int64, useRequestIds: bool, shortName: string,
|
||||
outgoingRequestDecorator: NimNode,
|
||||
incomingRequestDecorator: NimNode,
|
||||
incomingRequestThunkDecorator: NimNode,
|
||||
incomingResponseDecorator: NimNode,
|
||||
incomingResponseThunkDecorator: NimNode,
|
||||
peerState, networkState: NimNode): P2PProtocol =
|
||||
|
||||
result = P2PProtocol(
|
||||
name: name,
|
||||
version: version,
|
||||
timeouts: timeouts,
|
||||
useRequestIds: useRequestIds,
|
||||
shortName: shortName,
|
||||
outgoingRequestDecorator: outgoingRequestDecorator,
|
||||
incomingRequestDecorator: incomingRequestDecorator,
|
||||
incomingRequestThunkDecorator: incomingRequestThunkDecorator,
|
||||
incomingResponseDecorator: incomingResponseDecorator,
|
||||
incomingResponseThunkDecorator: incomingResponseThunkDecorator,
|
||||
PeerStateType: verifyStateType peerState,
|
||||
NetworkStateType: verifyStateType networkState,
|
||||
nameIdent: ident(name),
|
||||
protocolInfoVar: ident(name & "Protocol"),
|
||||
outSendProcs: newStmtList(),
|
||||
outRecvProcs: newStmtList(),
|
||||
outProcRegistrations: newStmtList())
|
||||
|
||||
result.backend = backendFactory(result)
|
||||
result.processProtocolBody body
|
||||
|
||||
if not result.backend.afterProtocolInit.isNil:
|
||||
result.backend.afterProtocolInit(result)
|
||||
|
||||
proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = -1) =
|
||||
## This procs adds a set of common helpers available in all messages handlers
|
||||
## (e.g. `perProtocolMsgId`, `peer.state`, etc).
|
||||
var
|
||||
prelude = newStmtList()
|
||||
getState = ident"getState"
|
||||
getNetworkState = ident"getNetworkState"
|
||||
currentProtocolSym = ident"CurrentProtocol"
|
||||
perProtocolMsgId = ident"perProtocolMsgId"
|
||||
protocolInfoVar = p.protocolInfoVar
|
||||
protocolNameIdent = p.nameIdent
|
||||
PeerType = p.backend.PeerType
|
||||
PeerStateType = p.PeerStateType
|
||||
NetworkStateType = p.NetworkStateType
|
||||
|
||||
userHandlerProc.addPragma ident"gcsafe"
|
||||
userHandlerProc.addPragma ident"async"
|
||||
|
||||
userHandlerProc.body.insert 0, prelude
|
||||
|
||||
when false:
|
||||
# TODO
|
||||
## Turns a regular proc definition into an async proc and adds
|
||||
## the helpers for accessing the peer and network protocol states.
|
||||
case msgKind
|
||||
of msgRequest: userHandlerProc.applyDecorator incomingRequestDecorator
|
||||
of msgResponse: userHandlerProc.applyDecorator incomingResponseDecorator
|
||||
else: discard
|
||||
|
||||
# We allow the user handler to use `openarray` params, but we turn
|
||||
# those into sequences to make the `async` pragma happy.
|
||||
for i in 1 ..< userHandlerProc.params.len:
|
||||
var param = userHandlerProc.params[i]
|
||||
param[^2] = chooseFieldType(param[^2])
|
||||
|
||||
prelude.add quote do:
|
||||
type `currentProtocolSym` = `protocolNameIdent`
|
||||
|
||||
if msgId >= 0:
|
||||
prelude.add quote do:
|
||||
const `perProtocolMsgId` = `msgId`
|
||||
|
||||
# Define local accessors for the peer and the network protocol states
|
||||
# inside each user message handler proc (e.g. peer.state.foo = bar)
|
||||
if PeerStateType != nil:
|
||||
prelude.add quote do:
|
||||
template state(p: `PeerType`): `PeerStateType` =
|
||||
cast[`PeerStateType`](`getState`(p, `protocolInfoVar`))
|
||||
|
||||
if NetworkStateType != nil:
|
||||
prelude.add quote do:
|
||||
template networkState(p: `PeerType`): `NetworkStateType` =
|
||||
cast[`NetworkStateType`](`getNetworkState`(p.network, `protocolInfoVar`))
|
||||
|
||||
proc addPreludeDefs*(userHandlerProc: NimNode, definitions: NimNode) =
|
||||
userHandlerProc.body[0].add definitions
|
||||
|
||||
proc eventHandlerToProc(p: P2PProtocol, doBlock: NimNode, handlerName: string): NimNode =
|
||||
## Turns a "named" do block to a regular async proc
|
||||
## (e.g. onPeerConnected do ...)
|
||||
result = newTree(nnkProcDef)
|
||||
doBlock.copyChildrenTo(result)
|
||||
result.name = ident(p.name & handlerName) # genSym(nskProc, p.name & handlerName)
|
||||
p.augmentUserHandler result
|
||||
|
||||
proc ensureTimeoutParam(procDef: NimNode, timeouts: int64): NimNode =
|
||||
## Make sure the messages has a timeout parameter and it has the correct type.
|
||||
## The parameter will be removed from the signature and returned for caching
|
||||
## in the Message's timeoutParam field. It is needed only for the send procs.
|
||||
var
|
||||
Duration = bindSym"Duration"
|
||||
milliseconds = bindSym"milliseconds"
|
||||
lastParam = procDef.params[^1]
|
||||
|
||||
if eqIdent(lastParam[0], "timeout"):
|
||||
if lastParam[2].kind == nnkEmpty:
|
||||
error "You must specify a default value for the `timeout` parameter", lastParam
|
||||
lastParam[2] = newCall(milliseconds, newLit(100))# newCall(Duration, lastParam[2])
|
||||
if lastParam[1].kind == nnkEmpty:
|
||||
lastParam[1] = Duration
|
||||
elif not eqIdent(lastParam[1], "Duration"):
|
||||
error "The timeout parameter should be of type 'chronos.Duration'", lastParam[1]
|
||||
|
||||
result = lastParam
|
||||
procDef.params.del(procDef.params.len - 1)
|
||||
|
||||
else:
|
||||
result = newTree(nnkIdentDefs,
|
||||
ident"timeout",
|
||||
Duration,
|
||||
newCall(milliseconds, newLit(timeouts)))
|
||||
|
||||
proc newMsg(p: P2PProtocol, kind: MessageKind, id: int,
|
||||
procDef: NimNode, timeoutParam: NimNode = nil): Message =
|
||||
|
||||
if procDef[0].kind == nnkPostfix:
|
||||
error("p2pProcotol procs are public by default. " &
|
||||
"Please remove the postfix `*`.", procDef)
|
||||
|
||||
var
|
||||
msgIdent = procDef.name
|
||||
msgName = $msgIdent
|
||||
recFields = newTree(nnkRecList)
|
||||
recBody = newTree(nnkObjectTy, newEmptyNode(), newEmptyNode(), recFields)
|
||||
recName = ident(msgName & "Obj")
|
||||
|
||||
for param, paramType in procDef.typedParams(skip = 1):
|
||||
recFields.add newTree(nnkIdentDefs,
|
||||
newTree(nnkPostfix, ident("*"), param), # The fields are public
|
||||
chooseFieldType(paramType), # some types such as openarray
|
||||
# are automatically remapped
|
||||
newEmptyNode())
|
||||
|
||||
result = Message(id: id, ident: msgIdent, kind: kind,
|
||||
procDef: procDef, recIdent: recName, recBody: recBody,
|
||||
timeoutParam: timeoutParam)
|
||||
|
||||
if procDef.body.kind != nnkEmpty:
|
||||
result.userHandler = copy procDef
|
||||
p.augmentUserHandler result.userHandler
|
||||
result.userHandler.name = genSym(nskProc, msgName)
|
||||
|
||||
p.messages.add result
|
||||
|
||||
proc identWithExportMarker*(msg: Message): NimNode =
|
||||
newTree(nnkPostfix, ident("*"), msg.ident)
|
||||
|
||||
proc createSendProc*(msg: Message, procType = nnkProcDef): NimNode =
|
||||
# TODO: file an issue:
|
||||
# macros.newProc and macros.params doesn't work with nnkMacroDef
|
||||
|
||||
let pragmas = if procType == nnkProcDef: newTree(nnkPragma, ident"gcsafe")
|
||||
else: newEmptyNode()
|
||||
|
||||
result = newNimNode(procType).add(
|
||||
msg.identWithExportMarker, ## name
|
||||
newEmptyNode(),
|
||||
newEmptyNode(),
|
||||
msg.procDef.params.copy, ## params
|
||||
pragmas,
|
||||
newEmptyNode(),
|
||||
newStmtList()) ## body
|
||||
|
||||
if msg.kind in {msgHandshake, msgRequest}:
|
||||
result[3].add msg.timeoutParam
|
||||
|
||||
result[3][0] = if procType == nnkMacroDef: ident "untyped"
|
||||
else: newTree(nnkBracketExpr, ident("Future"), msg.recIdent)
|
||||
|
||||
proc appendAllParams*(node: NimNode, procDef: NimNode, skipFirst = 0): NimNode =
|
||||
result = node
|
||||
for p, _ in procDef.typedParams(skip = skipFirst):
|
||||
result.add p
|
||||
|
||||
proc netInit*(p: P2PProtocol): NimNode =
|
||||
if p.NetworkStateType == nil:
|
||||
newNilLit()
|
||||
else:
|
||||
newTree(nnkBracketExpr, bindSym"createNetworkState",
|
||||
p.backend.NetworkType,
|
||||
p.NetworkStateType)
|
||||
|
||||
proc peerInit*(p: P2PProtocol): NimNode =
|
||||
if p.PeerStateType == nil:
|
||||
newNilLit()
|
||||
else:
|
||||
newTree(nnkBracketExpr, bindSym"createPeerState",
|
||||
p.backend.PeerType,
|
||||
p.PeerStateType)
|
||||
|
||||
proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode) =
|
||||
## This procs handles all DSL statements valid inside a p2pProtocol.
|
||||
##
|
||||
## It will populate the protocol's fields such as:
|
||||
## * handshake
|
||||
## * requests
|
||||
## * notifications
|
||||
## * onPeerConnected
|
||||
## * onPeerDisconnected
|
||||
##
|
||||
## All messages will have properly computed numeric IDs
|
||||
##
|
||||
var nextId = 0
|
||||
|
||||
for n in protocolBody:
|
||||
case n.kind
|
||||
of {nnkCall, nnkCommand}:
|
||||
if eqIdent(n[0], "nextID"):
|
||||
# By default message IDs are assigned in increasing order
|
||||
# `nextID` can be used to skip some of the numeric slots
|
||||
if n.len == 2 and n[1].kind == nnkIntLit:
|
||||
nextId = n[1].intVal.int
|
||||
else:
|
||||
error("nextID expects a single int value", n)
|
||||
|
||||
elif eqIdent(n[0], "requestResponse"):
|
||||
# `requestResponse` can be given a block of 2 or more procs.
|
||||
# The last one is considered to be a response message, while
|
||||
# all preceeding ones are requests triggering the response.
|
||||
# The system makes sure to automatically insert a hidden `reqId`
|
||||
# parameter used to discriminate the individual messages.
|
||||
let procs = expectBlockWithProcs(n)
|
||||
if procs.len < 2:
|
||||
error "requestResponse expects a block with at least two proc definitions"
|
||||
|
||||
var queries = newSeq[Message]()
|
||||
for i in 0 .. procs.len - 2:
|
||||
var timeout = ensureTimeoutParam(procs[i], p.timeouts)
|
||||
queries.add p.newMsg(msgRequest, nextId + i, procs[i], timeout)
|
||||
|
||||
p.requests.add Request(
|
||||
queries: queries,
|
||||
response: p.newMsg(msgResponse, nextId + procs.len - 1, procs[^1]))
|
||||
|
||||
inc nextId, procs.len
|
||||
|
||||
elif eqIdent(n[0], "handshake"):
|
||||
let procs = expectBlockWithProcs(n)
|
||||
if procs.len != 1:
|
||||
error "handshake expects a block with a single proc definition", n
|
||||
|
||||
if p.handshake != nil:
|
||||
error "The handshake for the protocol is already defined", n
|
||||
|
||||
var timeout = ensureTimeoutParam(procs[0], p.timeouts)
|
||||
p.handshake = p.newMsg(msgHandshake, nextId, procs[0], timeout)
|
||||
inc nextId
|
||||
|
||||
elif eqIdent(n[0], "onPeerConnected"):
|
||||
p.onPeerConnected = p.eventHandlerToProc(n[1], "PeerConnected")
|
||||
|
||||
elif eqIdent(n[0], "onPeerDisconnected"):
|
||||
p.onPeerDisconnected = p.eventHandlerToProc(n[1], "PeerDisconnected")
|
||||
|
||||
else:
|
||||
error(repr(n) & " is not a recognized call in P2P protocol definitions", n)
|
||||
|
||||
of nnkProcDef:
|
||||
p.notifications.add p.newMsg(msgNotification, nextId, n)
|
||||
inc nextId
|
||||
|
||||
of nnkCommentStmt:
|
||||
discard
|
||||
|
||||
else:
|
||||
error "Illegal syntax in a P2P protocol definition", n
|
||||
|
||||
proc genTypeSection*(p: P2PProtocol): NimNode =
|
||||
var
|
||||
protocolName = p.nameIdent
|
||||
peerState = p.PeerStateType
|
||||
networkState= p.NetworkStateType
|
||||
|
||||
result = newStmtList()
|
||||
result.add quote do:
|
||||
# Create a type acting as a pseudo-object representing the protocol
|
||||
# (e.g. p2p)
|
||||
type `protocolName`* = object
|
||||
|
||||
if peerState != nil:
|
||||
result.add quote do:
|
||||
template State*(P: type `protocolName`): type = `peerState`
|
||||
|
||||
if networkState != nil:
|
||||
result.add quote do:
|
||||
template NetworkState*(P: type `protocolName`): type = `networkState`
|
||||
|
||||
for msg in p.messages:
|
||||
let
|
||||
msgId = msg.id
|
||||
msgName = msg.ident
|
||||
msgRecName = msg.recIdent
|
||||
msgRecBody = msg.recBody
|
||||
|
||||
result.add quote do:
|
||||
# This is a type featuring a single field for each message param:
|
||||
type `msgRecName`* = `msgRecBody`
|
||||
|
||||
# Add a helper template for accessing the message type:
|
||||
# e.g. p2p.hello:
|
||||
template `msgName`*(T: type `protocolName`): type = `msgRecName`
|
||||
|
||||
# Add a helper template for obtaining the message Id for
|
||||
# a particular message type:
|
||||
template msgId*(T: type `msgRecName`): int = `msgId`
|
||||
template msgProtocol*(T: type `msgRecName`): type = `protocolName`
|
||||
|
||||
proc genCode*(p: P2PProtocol): NimNode =
|
||||
# TODO: try switching to a simpler for msg in p.messages: loop
|
||||
if p.handshake != nil:
|
||||
p.backend.implementMsg p, p.handshake
|
||||
|
||||
for msg in p.notifications:
|
||||
p.backend.implementMsg p, msg
|
||||
|
||||
for req in p.requests:
|
||||
p.backend.implementMsg p, req.response
|
||||
for query in req.queries: p.backend.implementMsg(p, query, req.response)
|
||||
|
||||
result = newStmtList()
|
||||
result.add p.genTypeSection()
|
||||
|
||||
let
|
||||
protocolInfoVar = p.protocolInfoVar
|
||||
protocolName = p.nameIdent
|
||||
protocolInit = p.backend.implementProtocolInit(p)
|
||||
|
||||
result.add quote do:
|
||||
# One global variable per protocol holds the protocol run-time data
|
||||
var p = `protocolInit`
|
||||
var `protocolInfoVar` = addr p
|
||||
|
||||
# The protocol run-time data is available as a pseudo-field
|
||||
# (e.g. `p2p.protocolInfo`)
|
||||
template protocolInfo*(P: type `protocolName`): auto = `protocolInfoVar`
|
||||
|
||||
result.add p.outSendProcs,
|
||||
p.outRecvProcs,
|
||||
p.outProcRegistrations
|
||||
|
||||
if p.onPeerConnected != nil: result.add p.onPeerConnected
|
||||
if p.onPeerDisconnected != nil: result.add p.onPeerDisconnected
|
||||
|
||||
result.add newCall(p.backend.setEventHandlers,
|
||||
protocolInfoVar,
|
||||
nameOrNil p.onPeerConnected,
|
||||
nameOrNil p.onPeerDisconnected)
|
||||
|
||||
result.add newCall(p.backend.registerProtocol, protocolInfoVar)
|
||||
|
||||
macro emitForSingleBackend(
|
||||
name: static[string],
|
||||
version: static[int],
|
||||
backend: static[BackendFactory],
|
||||
body: untyped,
|
||||
# TODO Nim can't handle a proper duration paramter here
|
||||
timeouts: static[int64] = defaultReqTimeout.milliseconds,
|
||||
useRequestIds: static[bool] = true,
|
||||
shortName: static[string] = "",
|
||||
outgoingRequestDecorator: untyped = nil,
|
||||
incomingRequestDecorator: untyped = nil,
|
||||
incomingRequestThunkDecorator: untyped = nil,
|
||||
incomingResponseDecorator: untyped = nil,
|
||||
incomingResponseThunkDecorator: untyped = nil,
|
||||
peerState = type(nil),
|
||||
networkState = type(nil)): untyped =
|
||||
|
||||
var p = P2PProtocol.init(
|
||||
backend,
|
||||
name, version, body, timeouts,
|
||||
useRequestIds, shortName,
|
||||
outgoingRequestDecorator,
|
||||
incomingRequestDecorator,
|
||||
incomingRequestThunkDecorator,
|
||||
incomingResponseDecorator,
|
||||
incomingResponseThunkDecorator,
|
||||
peerState.getType, networkState.getType)
|
||||
|
||||
result = p.genCode()
|
||||
|
||||
when defined(debugRlpxProtocol) or defined(debugMacros):
|
||||
echo repr(result)
|
||||
|
||||
macro emitForAllBackends(backendSyms: typed, options: untyped, body: untyped): untyped =
|
||||
let name = $(options[0])
|
||||
|
||||
var backends = newSeq[NimNode]()
|
||||
if backendSyms.kind == nnkSym:
|
||||
backends.add backendSyms
|
||||
else:
|
||||
for backend in backendSyms:
|
||||
backends.add backend
|
||||
|
||||
result = newStmtList()
|
||||
|
||||
for backend in backends:
|
||||
let call = copy options
|
||||
call[0] = bindSym"emitForSingleBackend"
|
||||
call.add newTree(nnkExprEqExpr, ident("name"), newLit(name))
|
||||
call.add newTree(nnkExprEqExpr, ident("backend"), backend)
|
||||
call.add newTree(nnkExprEqExpr, ident("body"), body)
|
||||
result.add call
|
||||
|
||||
template p2pProtocol*(options: untyped, body: untyped) {.dirty.} =
|
||||
bind emitForAllBackends
|
||||
emitForAllBackends(p2pProtocolBackendImpl, options, body)
|
||||
|
|
@ -144,11 +144,6 @@ type
|
|||
DisconnectionHandler* = proc(peer: Peer,
|
||||
reason: DisconnectionReason): Future[void] {.gcsafe.}
|
||||
|
||||
RlpxMessageKind* = enum
|
||||
rlpxNotification,
|
||||
rlpxRequest,
|
||||
rlpxResponse
|
||||
|
||||
ConnectionState* = enum
|
||||
None,
|
||||
Connecting,
|
||||
|
@ -177,3 +172,5 @@ type
|
|||
|
||||
Response*[MsgType] = distinct Peer
|
||||
|
||||
proc `$`*(peer: Peer): string = $peer.remote
|
||||
|
||||
|
|
461
eth/p2p/rlpx.nim
461
eth/p2p/rlpx.nim
|
@ -1,18 +1,20 @@
|
|||
import
|
||||
macros, tables, algorithm, deques, hashes, options, typetraits,
|
||||
std_shims/macros_shim, chronicles, nimcrypto, chronos, eth/[rlp, common, keys, async_utils],
|
||||
private/p2p_types, kademlia, auth, rlpxcrypt, enode
|
||||
private/p2p_types, kademlia, auth, rlpxcrypt, enode, p2p_protocol_dsl
|
||||
|
||||
when useSnappy:
|
||||
import snappy
|
||||
const devp2pSnappyVersion* = 5
|
||||
|
||||
export
|
||||
p2pProtocol
|
||||
|
||||
logScope:
|
||||
topics = "rlpx"
|
||||
|
||||
const
|
||||
devp2pVersion* = 4
|
||||
defaultReqTimeout = milliseconds(10000)
|
||||
maxMsgSize = 1024 * 1024
|
||||
|
||||
include p2p_tracing
|
||||
|
@ -305,10 +307,10 @@ proc send*[Msg](peer: Peer, msg: Msg): Future[void] =
|
|||
rlpWriter.appendRecordType(msg, Msg.rlpFieldsCount > 1)
|
||||
peer.sendMsg rlpWriter.finish
|
||||
|
||||
proc registerRequest*(peer: Peer,
|
||||
timeout: Duration,
|
||||
responseFuture: FutureBase,
|
||||
responseMsgId: int): int =
|
||||
proc registerRequest(peer: Peer,
|
||||
timeout: Duration,
|
||||
responseFuture: FutureBase,
|
||||
responseMsgId: int): int =
|
||||
inc peer.lastReqId
|
||||
result = peer.lastReqId
|
||||
|
||||
|
@ -581,164 +583,74 @@ proc dispatchMessages*(peer: Peer) {.async.} =
|
|||
return
|
||||
peer.awaitedMessages[msgId] = nil
|
||||
|
||||
macro p2pProtocolImpl(name: static[string],
|
||||
version: static[uint],
|
||||
body: untyped,
|
||||
# TODO Nim can't handle a proper duration paramter here
|
||||
timeout: static[int64] = defaultReqTimeout.milliseconds,
|
||||
useRequestIds: static[bool] = true,
|
||||
shortName: static[string] = "",
|
||||
outgoingRequestDecorator: untyped = nil,
|
||||
incomingRequestDecorator: untyped = nil,
|
||||
incomingRequestThunkDecorator: untyped = nil,
|
||||
incomingResponseDecorator: untyped = nil,
|
||||
incomingResponseThunkDecorator: untyped = nil,
|
||||
peerState = type(nil),
|
||||
networkState = type(nil)): untyped =
|
||||
## The macro used to defined RLPx sub-protocols. See README.
|
||||
var
|
||||
# XXX: deal with a Nim bug causing the macro params to be
|
||||
# zero when they are captured by a closure:
|
||||
outgoingRequestDecorator = outgoingRequestDecorator
|
||||
incomingRequestDecorator = incomingRequestDecorator
|
||||
incomingRequestThunkDecorator = incomingRequestThunkDecorator
|
||||
incomingResponseDecorator = incomingResponseDecorator
|
||||
incomingResponseThunkDecorator = incomingResponseThunkDecorator
|
||||
useRequestIds = useRequestIds
|
||||
version = version
|
||||
defaultTimeout = timeout
|
||||
template applyDecorator(p: NimNode, decorator: NimNode) =
|
||||
if decorator.kind != nnkNilLit: p.addPragma decorator
|
||||
|
||||
nextId = 0
|
||||
protoName = name
|
||||
shortName = if shortName.len > 0: shortName else: protoName
|
||||
outTypes = newNimNode(nnkStmtList)
|
||||
outSendProcs = newNimNode(nnkStmtList)
|
||||
outRecvProcs = newNimNode(nnkStmtList)
|
||||
outProcRegistrations = newNimNode(nnkStmtList)
|
||||
protoNameIdent = ident(protoName)
|
||||
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||
let
|
||||
resultIdent = ident "result"
|
||||
perProtocolMsgId = ident"perProtocolMsgId"
|
||||
response = ident"response"
|
||||
currentProtocolSym = ident"CurrentProtocol"
|
||||
protocol = ident(protoName & "Protocol")
|
||||
isSubprotocol = version > 0'u
|
||||
peerState = verifyStateType peerState.getType
|
||||
networkState = verifyStateType networkState.getType
|
||||
handshake = newNilLit()
|
||||
disconnectHandler = newNilLit()
|
||||
isSubprotocol = p.version > 0
|
||||
Option = bindSym "Option"
|
||||
# XXX: Binding the int type causes instantiation failure for some reason
|
||||
# Int = bindSym "int"
|
||||
Int = ident "int"
|
||||
Peer = bindSym "Peer"
|
||||
Duration = bindSym "Duration"
|
||||
milliseconds = bindSym "milliseconds"
|
||||
createNetworkState = bindSym "createNetworkState"
|
||||
createPeerState = bindSym "createPeerState"
|
||||
finish = bindSym "finish"
|
||||
EthereumNode = bindSym "EthereumNode"
|
||||
Response = bindSym "Response"
|
||||
ResponseWithId = bindSym "ResponseWithId"
|
||||
perProtocolMsgId = ident"perProtocolMsgId"
|
||||
|
||||
initRlpWriter = bindSym "initRlpWriter"
|
||||
safeEnterList = bindSym "safeEnterList"
|
||||
messagePrinter = bindSym "messagePrinter"
|
||||
initProtocol = bindSym "initProtocol"
|
||||
nextMsgResolver = bindSym "nextMsgResolver"
|
||||
rlpFromBytes = bindSym "rlpFromBytes"
|
||||
append = bindSym("append", brForceOpen)
|
||||
read = bindSym("read", brForceOpen)
|
||||
startList = bindSym "startList"
|
||||
enterList = bindSym "enterList"
|
||||
finish = bindSym "finish"
|
||||
|
||||
messagePrinter = bindSym "messagePrinter"
|
||||
nextMsgResolver = bindSym "nextMsgResolver"
|
||||
registerRequest = bindSym "registerRequest"
|
||||
requestResolver = bindSym "requestResolver"
|
||||
resolveResponseFuture = bindSym "resolveResponseFuture"
|
||||
rlpFromBytes = bindSym "rlpFromBytes"
|
||||
checkedRlpRead = bindSym "checkedRlpRead"
|
||||
sendMsg = bindSym "sendMsg"
|
||||
startList = bindSym "startList"
|
||||
nextMsg = bindSym "nextMsg"
|
||||
initProtocol = bindSym"initProtocol"
|
||||
registerMsg = bindSym "registerMsg"
|
||||
writeMsgId = bindSym "writeMsgId"
|
||||
getState = bindSym "getState"
|
||||
getNetworkState = bindSym "getNetworkState"
|
||||
perPeerMsgId = bindSym "perPeerMsgId"
|
||||
perPeerMsgIdImpl = bindSym "perPeerMsgIdImpl"
|
||||
linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture"
|
||||
shortName = if p.shortName.len > 0: p.shortName else: p.name
|
||||
|
||||
# By convention, all Ethereum protocol names must be abbreviated to 3 letters
|
||||
doAssert shortName.len == 3
|
||||
|
||||
template applyDecorator(p: NimNode, decorator: NimNode) =
|
||||
if decorator.kind != nnkNilLit: p.addPragma decorator
|
||||
new result
|
||||
|
||||
proc augmentUserHandler(userHandlerProc: NimNode,
|
||||
msgId = -1,
|
||||
msgKind = rlpxNotification,
|
||||
extraDefinitions: NimNode = nil) =
|
||||
## Turns a regular proc definition into an async proc and adds
|
||||
## the helpers for accessing the peer and network protocol states.
|
||||
case msgKind
|
||||
of rlpxRequest: userHandlerProc.applyDecorator incomingRequestDecorator
|
||||
of rlpxResponse: userHandlerProc.applyDecorator incomingResponseDecorator
|
||||
else: discard
|
||||
|
||||
userHandlerProc.addPragma ident"gcsafe"
|
||||
userHandlerProc.addPragma ident"async"
|
||||
|
||||
# We allow the user handler to use `openarray` params, but we turn
|
||||
# those into sequences to make the `async` pragma happy.
|
||||
for i in 1 ..< userHandlerProc.params.len:
|
||||
var param = userHandlerProc.params[i]
|
||||
param[^2] = chooseFieldType(param[^2])
|
||||
|
||||
var userHandlerDefinitions = newStmtList()
|
||||
|
||||
userHandlerDefinitions.add quote do:
|
||||
type `currentProtocolSym` = `protoNameIdent`
|
||||
|
||||
if extraDefinitions != nil:
|
||||
userHandlerDefinitions.add extraDefinitions
|
||||
|
||||
if msgId >= 0:
|
||||
userHandlerDefinitions.add quote do:
|
||||
const `perProtocolMsgId` = `msgId`
|
||||
|
||||
# Define local accessors for the peer and the network protocol states
|
||||
# inside each user message handler proc (e.g. peer.state.foo = bar)
|
||||
if peerState != nil:
|
||||
userHandlerDefinitions.add quote do:
|
||||
template state(p: `Peer`): `peerState` =
|
||||
cast[`peerState`](`getState`(p, `protocol`))
|
||||
|
||||
if networkState != nil:
|
||||
userHandlerDefinitions.add quote do:
|
||||
template networkState(p: `Peer`): `networkState` =
|
||||
cast[`networkState`](`getNetworkState`(p.network, `protocol`))
|
||||
|
||||
userHandlerProc.body.insert 0, userHandlerDefinitions
|
||||
|
||||
proc liftEventHandler(doBlock: NimNode, handlerName: string): NimNode =
|
||||
## Turns a "named" do block to a regular async proc
|
||||
## (e.g. onPeerConnected do ...)
|
||||
var fn = newTree(nnkProcDef)
|
||||
doBlock.copyChildrenTo(fn)
|
||||
result = genSym(nskProc, protoName & handlerName)
|
||||
fn.name = result
|
||||
augmentUserHandler fn
|
||||
outRecvProcs.add fn
|
||||
|
||||
proc addMsgHandler(msgId: int, n: NimNode,
|
||||
msgKind = rlpxNotification,
|
||||
responseMsgId = -1,
|
||||
responseRecord: NimNode = nil): NimNode =
|
||||
if n[0].kind == nnkPostfix:
|
||||
macros.error("p2pProcotol procs are public by default. " &
|
||||
"Please remove the postfix `*`.", n)
|
||||
|
||||
let
|
||||
msgIdent = n.name
|
||||
msgName = $n.name
|
||||
hasReqIds = useRequestIds and msgKind in {rlpxRequest, rlpxResponse}
|
||||
result.registerProtocol = bindSym "registerProtocol"
|
||||
result.setEventHandlers = bindSym "setEventHandlers"
|
||||
result.PeerType = Peer
|
||||
result.NetworkType = EthereumNode
|
||||
|
||||
result.implementMsg = proc (p: P2PProtocol, msg: Message, resp: Message = nil) =
|
||||
var
|
||||
msgId = msg.id
|
||||
msgRecName = msg.recIdent
|
||||
msgKind = msg.kind
|
||||
n = msg.procDef
|
||||
responseMsgId = if resp != nil: resp.id else: -1
|
||||
responseRecord = if resp != nil: resp.recIdent else: nil
|
||||
msgIdent = n.name
|
||||
msgName = $msgIdent
|
||||
hasReqIds = p.useRequestIds and msgKind in {msgRequest, msgResponse}
|
||||
userPragmas = n.pragma
|
||||
|
||||
# variables used in the sending procs
|
||||
msgRecipient = ident"msgRecipient"
|
||||
sendTo = ident"sendTo"
|
||||
reqTimeout: NimNode
|
||||
rlpWriter = ident"writer"
|
||||
appendParams = newNimNode(nnkStmtList)
|
||||
paramsToWrite = newSeq[NimNode](0)
|
||||
|
@ -754,37 +666,16 @@ macro p2pProtocolImpl(name: static[string],
|
|||
callResolvedResponseFuture = newNimNode(nnkStmtList)
|
||||
|
||||
# nodes to store the user-supplied message handling proc if present
|
||||
userHandlerProc: NimNode = nil
|
||||
userHandlerCall: NimNode = nil
|
||||
awaitUserHandler = newStmtList()
|
||||
|
||||
# a record type associated with the message
|
||||
msgRecord = newIdentNode(msgName & "Obj")
|
||||
msgRecordFields = newTree(nnkRecList)
|
||||
msgRecordBody = newTree(nnkObjectTy,
|
||||
newEmptyNode(),
|
||||
newEmptyNode(),
|
||||
msgRecordFields)
|
||||
|
||||
result = msgRecord
|
||||
if hasReqIds:
|
||||
# Messages using request Ids
|
||||
readParams.add quote do:
|
||||
let `reqId` = `read`(`receivedRlp`, int)
|
||||
|
||||
case msgKind
|
||||
of rlpxNotification: discard
|
||||
of rlpxRequest:
|
||||
# If the request proc has a default timeout specified, remove it from
|
||||
# the signature for now so we can generate the `thunk` proc without it.
|
||||
# The parameter will be added back later only for to the sender proc.
|
||||
# When the timeout is not specified, we use a default one.
|
||||
reqTimeout = popTimeoutParam(n)
|
||||
if reqTimeout == nil:
|
||||
reqTimeout = newTree(nnkIdentDefs,
|
||||
ident"timeout",
|
||||
Duration, newCall(milliseconds, newLit(defaultTimeout)))
|
||||
|
||||
of msgRequest:
|
||||
let reqToResponseOffset = responseMsgId - msgId
|
||||
let responseMsgId = quote do: `perPeerMsgIdVar` + `reqToResponseOffset`
|
||||
|
||||
|
@ -794,7 +685,7 @@ macro p2pProtocolImpl(name: static[string],
|
|||
# assumes there is one outstanding request at a time (if there are
|
||||
# multiple requests we'll resolve them in FIFO order).
|
||||
let registerRequestCall = newCall(registerRequest, msgRecipient,
|
||||
reqTimeout[0],
|
||||
msg.timeoutParam[0],
|
||||
resultIdent,
|
||||
responseMsgId)
|
||||
if hasReqIds:
|
||||
|
@ -807,61 +698,51 @@ macro p2pProtocolImpl(name: static[string],
|
|||
initFuture `resultIdent`
|
||||
discard `registerRequestCall`
|
||||
|
||||
of rlpxResponse:
|
||||
of msgResponse:
|
||||
let reqIdVal = if hasReqIds: `reqId` else: newLit(-1)
|
||||
callResolvedResponseFuture.add quote do:
|
||||
`resolveResponseFuture`(`msgSender`,
|
||||
`perPeerMsgId`(`msgSender`, `msgRecord`),
|
||||
`perPeerMsgId`(`msgSender`, `msgRecName`),
|
||||
addr(`receivedMsg`),
|
||||
`reqIdVal`)
|
||||
if hasReqIds:
|
||||
paramsToWrite.add newDotExpr(sendTo, ident"id")
|
||||
|
||||
if n.body.kind != nnkEmpty:
|
||||
# implement the receiving thunk proc that deserialzed the
|
||||
# message parameters and calls the user proc:
|
||||
userHandlerProc = n.copyNimTree
|
||||
userHandlerProc.name = genSym(nskProc, msgName)
|
||||
of msgHandshake, msgNotification: discard
|
||||
|
||||
if msg.userHandler != nil:
|
||||
var extraDefs: NimNode
|
||||
if msgKind == rlpxRequest:
|
||||
let peer = userHandlerProc.params[1][0]
|
||||
if msgKind == msgRequest:
|
||||
let peer = msg.userHandler.params[1][0]
|
||||
let response = ident"response"
|
||||
if hasReqIds:
|
||||
extraDefs = quote do:
|
||||
let `response` = ResponseWithId[`responseRecord`](peer: `peer`, id: `reqId`)
|
||||
let `response` = `ResponseWithId`[`responseRecord`](peer: `peer`, id: `reqId`)
|
||||
else:
|
||||
extraDefs = quote do:
|
||||
let `response` = Response[`responseRecord`](`peer`)
|
||||
let `response` = `Response`[`responseRecord`](`peer`)
|
||||
|
||||
augmentUserHandler userHandlerProc, msgId, msgKind, extraDefs
|
||||
msg.userHandler.addPreludeDefs extraDefs
|
||||
|
||||
# This is the call to the user supplied handled. Here we add only the
|
||||
# initial peer param, while the rest of the params will be added later.
|
||||
userHandlerCall = newCall(userHandlerProc.name, msgSender)
|
||||
userHandlerCall = newCall(msg.userHandler.name, msgSender)
|
||||
|
||||
if hasReqIds:
|
||||
userHandlerProc.params.insert(2, newIdentDefs(reqId, ident"int"))
|
||||
msg.userHandler.params.insert(2, newIdentDefs(reqId, ident"int"))
|
||||
userHandlerCall.add reqId
|
||||
|
||||
# When there is a user handler, it must be awaited in the thunk proc.
|
||||
# Above, by default `awaitUserHandler` is set to a no-op statement list.
|
||||
awaitUserHandler = newCall("await", userHandlerCall)
|
||||
|
||||
outRecvProcs.add(userHandlerProc)
|
||||
p.outRecvProcs.add(msg.userHandler)
|
||||
|
||||
for param, paramType in n.typedParams(skip = 1):
|
||||
# This is a fragment of the sending proc that
|
||||
# serializes each of the passed parameters:
|
||||
paramsToWrite.add param
|
||||
|
||||
# Each message has a corresponding record type.
|
||||
# Here, we create its fields one by one:
|
||||
msgRecordFields.add newTree(nnkIdentDefs,
|
||||
newTree(nnkPostfix, ident("*"), param), # The fields are public
|
||||
chooseFieldType(paramType), # some types such as openarray
|
||||
# are automatically remapped
|
||||
newEmptyNode())
|
||||
|
||||
# The received RLP data is deserialized to a local variable of
|
||||
# the message-specific type. This is done field by field here:
|
||||
let msgNameLit = newLit(msgName)
|
||||
|
@ -885,7 +766,7 @@ macro p2pProtocolImpl(name: static[string],
|
|||
var thunkProc = quote do:
|
||||
proc `thunkName`(`msgSender`: `Peer`, _: int, data: Rlp) {.gcsafe.} =
|
||||
var `receivedRlp` = data
|
||||
var `receivedMsg` {.noinit.}: `msgRecord`
|
||||
var `receivedMsg` {.noinit.}: `msgRecName`
|
||||
`readParamsPrelude`
|
||||
`readParams`
|
||||
`awaitUserHandler`
|
||||
|
@ -894,59 +775,48 @@ macro p2pProtocolImpl(name: static[string],
|
|||
for p in userPragmas: thunkProc.addPragma p
|
||||
|
||||
case msgKind
|
||||
of rlpxRequest: thunkProc.applyDecorator incomingRequestThunkDecorator
|
||||
of rlpxResponse: thunkProc.applyDecorator incomingResponseThunkDecorator
|
||||
of msgRequest: thunkProc.applyDecorator p.incomingRequestThunkDecorator
|
||||
of msgResponse: thunkProc.applyDecorator p.incomingResponseThunkDecorator
|
||||
else: discard
|
||||
|
||||
thunkProc.addPragma ident"async"
|
||||
|
||||
outRecvProcs.add thunkProc
|
||||
|
||||
outTypes.add quote do:
|
||||
# This is a type featuring a single field for each message param:
|
||||
type `msgRecord`* = `msgRecordBody`
|
||||
|
||||
# Add a helper template for accessing the message type:
|
||||
# e.g. p2p.hello:
|
||||
template `msgIdent`*(T: type `protoNameIdent`): type = `msgRecord`
|
||||
|
||||
# Add a helper template for obtaining the message Id for
|
||||
# a particular message type:
|
||||
template msgId*(T: type `msgRecord`): int = `msgId`
|
||||
template msgProtocol*(T: type `msgRecord`): type = `protoNameIdent`
|
||||
p.outRecvProcs.add thunkProc
|
||||
|
||||
var msgSendProc = n
|
||||
let msgSendProcName = n.name
|
||||
outSendProcs.add msgSendProc
|
||||
p.outSendProcs.add msgSendProc
|
||||
|
||||
# TODO: check that the first param has the correct type
|
||||
msgSendProc.params[1][0] = sendTo
|
||||
msgSendProc.addPragma ident"gcsafe"
|
||||
|
||||
# Add a timeout parameter for all request procs
|
||||
case msgKind
|
||||
of rlpxRequest:
|
||||
msgSendProc.params.add reqTimeout
|
||||
of rlpxResponse:
|
||||
of msgRequest:
|
||||
# Add a timeout parameter for all request procs
|
||||
msgSendProc.params.add msg.timeoutParam
|
||||
of msgResponse:
|
||||
# A response proc must be called with a response object that originates
|
||||
# from a certain request. Here we change the Peer parameter at position
|
||||
# 1 to the correct strongly-typed ResponseType. The incoming procs still
|
||||
# gets the normal Peer paramter.
|
||||
# let rsp = bindSym "Response"
|
||||
# let rspId = bindSym "ResponseWithId"
|
||||
let
|
||||
ResponseTypeHead = if useRequestIds: bindSym"ResponseWithId"
|
||||
else: bindSym"Response"
|
||||
ResponseType = newTree(nnkBracketExpr, ResponseTypeHead, msgRecord)
|
||||
ResponseTypeHead = if p.useRequestIds: ResponseWithId
|
||||
else: Response
|
||||
ResponseType = newTree(nnkBracketExpr, ResponseTypeHead, msgRecName)
|
||||
|
||||
msgSendProc.params[1][1] = ResponseType
|
||||
|
||||
outSendProcs.add quote do:
|
||||
p.outSendProcs.add quote do:
|
||||
template send*(r: `ResponseType`, args: varargs[untyped]): auto =
|
||||
`msgSendProcName`(r, args)
|
||||
else: discard
|
||||
|
||||
# We change the return type of the sending proc to a Future.
|
||||
# If this is a request proc, the future will return the response record.
|
||||
let rt = if msgKind != rlpxRequest: ident"void"
|
||||
let rt = if msgKind != msgRequest: ident"void"
|
||||
else: newTree(nnkBracketExpr, Option, responseRecord)
|
||||
msgSendProc.params[0] = newTree(nnkBracketExpr, ident("Future"), rt)
|
||||
|
||||
|
@ -956,7 +826,7 @@ macro p2pProtocolImpl(name: static[string],
|
|||
let `msgBytes` = `finish`(`rlpWriter`)
|
||||
|
||||
var sendCall = newCall(sendMsg, msgRecipient, msgBytes)
|
||||
let senderEpilogue = if msgKind == rlpxRequest:
|
||||
let senderEpilogue = if msgKind == msgRequest:
|
||||
# In RLPx requests, the returned future was allocated here and passed
|
||||
# to `registerRequest`. It's already assigned to the result variable
|
||||
# of the proc, so we just wait for the sending operation to complete
|
||||
|
@ -969,7 +839,7 @@ macro p2pProtocolImpl(name: static[string],
|
|||
quote: return `sendCall`
|
||||
|
||||
let perPeerMsgIdValue = if isSubprotocol:
|
||||
newCall(perPeerMsgIdImpl, msgRecipient, protocol, newLit(msgId))
|
||||
newCall(perPeerMsgIdImpl, msgRecipient, p.protocolInfoVar, newLit(msgId))
|
||||
else:
|
||||
newLit(msgId)
|
||||
|
||||
|
@ -983,8 +853,39 @@ macro p2pProtocolImpl(name: static[string],
|
|||
for p in paramsToWrite:
|
||||
appendParams.add newCall(append, rlpWriter, p)
|
||||
|
||||
# Make the send proc public
|
||||
msgSendProc.name = newTree(nnkPostfix, ident("*"), msgSendProc.name)
|
||||
if msgKind == msgHandshake:
|
||||
var
|
||||
rawSendProc = genSym(nskProc, msgName & "RawSend")
|
||||
handshakeExchanger = newProc(name = msg.identWithExportMarker,
|
||||
procType = nnkTemplateDef)
|
||||
|
||||
handshakeExchanger.params = msgSendProc.params.copyNimTree
|
||||
handshakeExchanger.params.add msg.timeoutParam
|
||||
handshakeExchanger.params[0] = newTree(nnkBracketExpr, ident("Future"), msgRecName)
|
||||
|
||||
var
|
||||
forwardCall = newCall(rawSendProc).appendAllParams(handshakeExchanger)
|
||||
peerVariable = ident"peer"
|
||||
peerValue = forwardCall[1]
|
||||
timeoutValue = msg.timeoutParam[0]
|
||||
handshakeImpl = ident"handshakeImpl"
|
||||
|
||||
forwardCall[1] = peerVariable
|
||||
forwardCall.del(forwardCall.len - 1)
|
||||
|
||||
handshakeExchanger.body = quote do:
|
||||
let `peerVariable` = `peerValue`
|
||||
let sendingFuture = `forwardCall`
|
||||
`handshakeImpl`(`peerVariable`,
|
||||
sendingFuture,
|
||||
`nextMsg`(`peerVariable`, `msgRecName`),
|
||||
`timeoutValue`)
|
||||
|
||||
msgSendProc.name = rawSendProc
|
||||
p.outSendProcs.add handshakeExchanger
|
||||
else:
|
||||
# Make the send proc public
|
||||
msgSendProc.name = msg.identWithExportMarker
|
||||
|
||||
let initWriter = quote do:
|
||||
var `rlpWriter` = `initRlpWriter`()
|
||||
|
@ -1003,123 +904,24 @@ macro p2pProtocolImpl(name: static[string],
|
|||
`finalizeRequest`
|
||||
`senderEpilogue`
|
||||
|
||||
if msgKind == rlpxRequest:
|
||||
msgSendProc.applyDecorator outgoingRequestDecorator
|
||||
if msgKind == msgRequest:
|
||||
msgSendProc.applyDecorator p.outgoingRequestDecorator
|
||||
|
||||
outProcRegistrations.add(
|
||||
newCall(bindSym("registerMsg"),
|
||||
protocol,
|
||||
p.outProcRegistrations.add(
|
||||
newCall(registerMsg,
|
||||
p.protocolInfoVar,
|
||||
newIntLitNode(msgId),
|
||||
newStrLitNode($n.name),
|
||||
thunkName,
|
||||
newTree(nnkBracketExpr, messagePrinter, msgRecord),
|
||||
newTree(nnkBracketExpr, requestResolver, msgRecord),
|
||||
newTree(nnkBracketExpr, nextMsgResolver, msgRecord)))
|
||||
newTree(nnkBracketExpr, messagePrinter, msgRecName),
|
||||
newTree(nnkBracketExpr, requestResolver, msgRecName),
|
||||
newTree(nnkBracketExpr, nextMsgResolver, msgRecName)))
|
||||
|
||||
outTypes.add quote do:
|
||||
# Create a type acting as a pseudo-object representing the protocol
|
||||
# (e.g. p2p)
|
||||
type `protoNameIdent`* = object
|
||||
|
||||
if peerState != nil:
|
||||
outTypes.add quote do:
|
||||
template State*(P: type `protoNameIdent`): type = `peerState`
|
||||
|
||||
if networkState != nil:
|
||||
outTypes.add quote do:
|
||||
template NetworkState*(P: type `protoNameIdent`): type = `networkState`
|
||||
|
||||
for n in body:
|
||||
case n.kind
|
||||
of {nnkCall, nnkCommand}:
|
||||
if eqIdent(n[0], "nextID"):
|
||||
# By default message IDs are assigned in increasing order
|
||||
# `nextID` can be used to skip some of the numeric slots
|
||||
if n.len == 2 and n[1].kind == nnkIntLit:
|
||||
nextId = n[1].intVal.int
|
||||
else:
|
||||
macros.error("nextID expects a single int value", n)
|
||||
elif eqIdent(n[0], "requestResponse"):
|
||||
# `requestResponse` can be given a block of 2 or more procs.
|
||||
# The last one is considered to be a response message, while
|
||||
# all preceeding ones are requests triggering the response.
|
||||
# The system makes sure to automatically insert a hidden `reqId`
|
||||
# parameter used to discriminate the individual messages.
|
||||
block processReqResp:
|
||||
if n.len == 2 and n[1].kind == nnkStmtList:
|
||||
var procs = newSeq[NimNode](0)
|
||||
for def in n[1]:
|
||||
if def.kind == nnkProcDef:
|
||||
procs.add(def)
|
||||
if procs.len > 1:
|
||||
let responseMsgId = nextId + procs.len - 1
|
||||
let responseRecord = addMsgHandler(responseMsgId,
|
||||
procs[^1],
|
||||
msgKind = rlpxResponse)
|
||||
for i in 0 .. procs.len - 2:
|
||||
discard addMsgHandler(nextId + i, procs[i],
|
||||
msgKind = rlpxRequest,
|
||||
responseMsgId = responseMsgId,
|
||||
responseRecord = responseRecord)
|
||||
|
||||
inc nextId, procs.len
|
||||
|
||||
# we got all the way to here, so everything is fine.
|
||||
# break the block so it doesn't reach the error call below
|
||||
break processReqResp
|
||||
macros.error("requestResponse expects a block with at least two proc definitions")
|
||||
elif eqIdent(n[0], "onPeerConnected"):
|
||||
handshake = liftEventHandler(n[1], "Handshake")
|
||||
elif eqIdent(n[0], "onPeerDisconnected"):
|
||||
disconnectHandler = liftEventHandler(n[1], "PeerDisconnect")
|
||||
else:
|
||||
macros.error(repr(n) & " is not a recognized call in P2P protocol definitions", n)
|
||||
of nnkProcDef:
|
||||
discard addMsgHandler(nextId, n)
|
||||
inc nextId
|
||||
|
||||
of nnkCommentStmt:
|
||||
discard
|
||||
|
||||
else:
|
||||
macros.error("illegal syntax in a P2P protocol definition", n)
|
||||
|
||||
let peerInit = if peerState == nil: newNilLit()
|
||||
else: newTree(nnkBracketExpr, createPeerState, peerState)
|
||||
|
||||
let netInit = if networkState == nil: newNilLit()
|
||||
else: newTree(nnkBracketExpr, createNetworkState, networkState)
|
||||
|
||||
result = newNimNode(nnkStmtList)
|
||||
result.add outTypes
|
||||
result.add quote do:
|
||||
# One global variable per protocol holds the protocol run-time data
|
||||
var p = `initProtocol`(`shortName`, `version`, `peerInit`, `netInit`)
|
||||
var `protocol` = addr p
|
||||
|
||||
# The protocol run-time data is available as a pseudo-field
|
||||
# (e.g. `p2p.protocolInfo`)
|
||||
template protocolInfo*(P: type `protoNameIdent`): ProtocolInfo = `protocol`
|
||||
|
||||
result.add outSendProcs, outRecvProcs, outProcRegistrations
|
||||
result.add quote do:
|
||||
setEventHandlers(`protocol`, `handshake`, `disconnectHandler`)
|
||||
|
||||
result.add newCall(bindSym("registerProtocol"), protocol)
|
||||
|
||||
when defined(debugRlpxProtocol) or defined(debugMacros):
|
||||
echo repr(result)
|
||||
|
||||
macro p2pProtocol*(protocolOptions: untyped, body: untyped): untyped =
|
||||
let protoName = $(protocolOptions[0])
|
||||
result = protocolOptions
|
||||
result[0] = bindSym"p2pProtocolImpl"
|
||||
result.add(newTree(nnkExprEqExpr,
|
||||
ident("name"),
|
||||
newLit(protoName)))
|
||||
result.add(newTree(nnkExprEqExpr,
|
||||
ident("body"),
|
||||
body))
|
||||
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
|
||||
return newCall(initProtocol,
|
||||
newLit(p.shortName),
|
||||
newLit(p.version),
|
||||
p.peerInit, p.netInit)
|
||||
|
||||
p2pProtocol devp2p(version = 0, shortName = "p2p"):
|
||||
proc hello(peer: Peer,
|
||||
|
@ -1162,7 +964,7 @@ proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): Future[voi
|
|||
|
||||
return all(futures)
|
||||
|
||||
proc handshakeImpl[T](peer: Peer,
|
||||
proc handshakeImpl*[T](peer: Peer,
|
||||
sendFut: Future[void],
|
||||
responseFut: Future[T],
|
||||
timeout: Duration): Future[T] {.async.} =
|
||||
|
@ -1180,21 +982,6 @@ proc handshakeImpl[T](peer: Peer,
|
|||
else:
|
||||
return responseFut.read
|
||||
|
||||
proc handshakeImpl(peer: Peer,
|
||||
sendFut: Future[void],
|
||||
timeout: Duration,
|
||||
HandshakeType: type): Future[HandshakeType] =
|
||||
handshakeImpl(peer, sendFut, nextMsg(peer, HandshakeType), timeout)
|
||||
|
||||
macro handshake*(peer: Peer, timeout: untyped, sendCall: untyped): untyped =
|
||||
let
|
||||
msgName = $sendCall[0]
|
||||
msgType = newDotExpr(ident"CurrentProtocol", ident(msgName))
|
||||
|
||||
sendCall.insert(1, peer)
|
||||
|
||||
result = newCall(bindSym"handshakeImpl", peer, sendCall, timeout, msgType)
|
||||
|
||||
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
|
||||
if peer.connectionState notin {Disconnecting, Disconnected}:
|
||||
peer.connectionState = Disconnecting
|
||||
|
|
|
@ -46,12 +46,12 @@ p2pProtocol eth(version = protocolVersion,
|
|||
chain = network.chain
|
||||
bestBlock = chain.getBestBlockHeader
|
||||
|
||||
let m = await peer.handshake(timeout = chronos.seconds(10),
|
||||
status(protocolVersion,
|
||||
network.networkId,
|
||||
bestBlock.difficulty,
|
||||
bestBlock.blockHash,
|
||||
chain.genesisHash))
|
||||
let m = await peer.status(protocolVersion,
|
||||
network.networkId,
|
||||
bestBlock.difficulty,
|
||||
bestBlock.blockHash,
|
||||
chain.genesisHash,
|
||||
timeout = chronos.seconds(10))
|
||||
|
||||
if m.networkId == network.networkId and m.genesisHash == chain.genesisHash:
|
||||
trace "suitable peer", peer
|
||||
|
@ -61,12 +61,13 @@ p2pProtocol eth(version = protocolVersion,
|
|||
peer.state.bestDifficulty = m.totalDifficulty
|
||||
peer.state.bestBlockHash = m.bestHash
|
||||
|
||||
proc status(peer: Peer,
|
||||
protocolVersion: uint,
|
||||
networkId: uint,
|
||||
totalDifficulty: DifficultyInt,
|
||||
bestHash: KeccakHash,
|
||||
genesisHash: KeccakHash)
|
||||
handshake:
|
||||
proc status(peer: Peer,
|
||||
protocolVersion: uint,
|
||||
networkId: uint,
|
||||
totalDifficulty: DifficultyInt,
|
||||
bestHash: KeccakHash,
|
||||
genesisHash: KeccakHash)
|
||||
|
||||
proc newBlockHashes(peer: Peer, hashes: openarray[NewBlockHashesAnnounce]) =
|
||||
discard
|
||||
|
|
|
@ -165,11 +165,8 @@ p2pProtocol les(version = lesVersion,
|
|||
outgoingRequestDecorator = outgoingRequestDecorator,
|
||||
incomingRequestDecorator = incomingRequestDecorator,
|
||||
incomingResponseThunkDecorator = incomingResponseDecorator):
|
||||
|
||||
## Handshake
|
||||
##
|
||||
|
||||
proc status(p: Peer, values: openarray[KeyValuePair])
|
||||
handshake:
|
||||
proc status(p: Peer, values: openarray[KeyValuePair])
|
||||
|
||||
onPeerConnected do (peer: Peer):
|
||||
let
|
||||
|
@ -208,7 +205,7 @@ p2pProtocol les(version = lesVersion,
|
|||
lesProperties.add(keyAnnounceType => lesNetwork.ourAnnounceType)
|
||||
|
||||
let
|
||||
s = await peer.handshake(timeout = chronos.seconds(10), status(lesProperties))
|
||||
s = await peer.status(lesProperties, timeout = chronos.seconds(10))
|
||||
peerNetworkId = s.values.getRequiredValue(keyNetworkId, uint)
|
||||
peerGenesisHash = s.values.getRequiredValue(keyGenesisHash, KeccakHash)
|
||||
peerLesVersion = s.values.getRequiredValue(keyProtocolVersion, uint)
|
||||
|
|
|
@ -733,11 +733,11 @@ p2pProtocol Whisper(version = whisperVersion,
|
|||
whisperNet = peer.networkState
|
||||
whisperPeer = peer.state
|
||||
|
||||
let m = await handshake(peer, timeout = chronos.milliseconds(500),
|
||||
status(whisperVersion,
|
||||
cast[uint](whisperNet.config.powRequirement),
|
||||
@(whisperNet.config.bloom),
|
||||
whisperNet.config.isLightNode))
|
||||
let m = await peer.status(whisperVersion,
|
||||
cast[uint](whisperNet.config.powRequirement),
|
||||
@(whisperNet.config.bloom),
|
||||
whisperNet.config.isLightNode,
|
||||
timeout = chronos.milliseconds(500))
|
||||
|
||||
if m.protocolVersion == whisperVersion:
|
||||
debug "Whisper peer", peer, whisperVersion
|
||||
|
@ -769,12 +769,12 @@ p2pProtocol Whisper(version = whisperVersion,
|
|||
|
||||
debug "Whisper peer initialized", peer
|
||||
|
||||
proc status(peer: Peer,
|
||||
protocolVersion: uint,
|
||||
powConverted: uint,
|
||||
bloom: Bytes,
|
||||
isLightNode: bool) =
|
||||
discard
|
||||
handshake:
|
||||
proc status(peer: Peer,
|
||||
protocolVersion: uint,
|
||||
powConverted: uint,
|
||||
bloom: Bytes,
|
||||
isLightNode: bool)
|
||||
|
||||
proc messages(peer: Peer, envelopes: openarray[Envelope]) =
|
||||
if not peer.state.initialized:
|
||||
|
@ -1051,7 +1051,6 @@ proc setLightNode*(node: EthereumNode, isLightNode: bool) =
|
|||
## setting is only communicated at peer handshake
|
||||
node.protocolState(Whisper).config.isLightNode = isLightNode
|
||||
|
||||
|
||||
proc configureWhisper*(node: EthereumNode, config: WhisperConfig) =
|
||||
## Apply a Whisper configuration
|
||||
## NOTE: Should be run before connection is made with peers as some
|
||||
|
|
|
@ -71,7 +71,7 @@ p2pProtocol xyz(version = 1,
|
|||
proc bar(p: Peer, i: int, s: string)
|
||||
|
||||
requestResponse:
|
||||
proc xyzReq(p: Peer, n: int, timeout = 3000) =
|
||||
proc xyzReq(p: Peer, n: int, timeout = 3.seconds) =
|
||||
echo "got req ", n
|
||||
|
||||
proc xyzRes(p: Peer, data: string) =
|
||||
|
|
Loading…
Reference in New Issue