Simplified the generation of sender procs

This commit is contained in:
Zahary Karadjov 2019-05-29 18:52:28 +03:00
parent 76bb2cde5c
commit 638442070d
No known key found for this signature in database
GPG Key ID: C8936F8A3073D609
3 changed files with 165 additions and 85 deletions

View File

@ -68,3 +68,21 @@ proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} =
# tremendously (for reasons not yet known)
# result = $(cast[ptr MsgType](msg)[])
proc handshakeImpl[T](peer: Peer,
sendFut: Future[void],
responseFut: Future[T],
timeout: Duration): Future[T] {.async.} =
sendFut.addCallback do (arg: pointer) {.gcsafe.}:
if sendFut.failed:
debug "Handshake message not delivered", peer
doAssert timeout.milliseconds > 0
yield responseFut or sleepAsync(timeout)
if not responseFut.finished:
discard disconnectAndRaise(peer, HandshakeTimeout,
"Protocol handshake was not received in time.")
elif responseFut.failed:
raise responseFut.error
else:
return responseFut.read

View File

@ -19,14 +19,39 @@ type
recBody*: NimNode
userHandler*: NimNode
protocol*: P2PProtocol
sendProcPeerParam*: NimNode
sendProcMsgParams*: seq[NimNode]
response*: Message
Request* = ref object
queries*: seq[Message]
response*: Message
SendProc* = object
## A `SendProc` is a proc used to send a single P2P message
## If it's a Request, the return type will be a Future returing
## the respective Response type. All send procs also have an
## automatically inserted `timeout` parameter.
msg*: Message
## The message being implemented
def*: NimNode
## The definition of the proc
peerParam*: NimNode
## Cached ident for the peer param
msgParams*: seq[NimNode]
## Cached param ident for all values that must be written
## on the wire. The automatically inserted `timeout` is not
## included.
timeoutParam*: NimNode
## Cached ident for the timeout parameter
allDefs*: NimNode
## The final definitions that must be become part of the
## p2pProtocol macro output. May include helper templates.
P2PProtocol* = ref object
# Settings
name*: string
@ -66,7 +91,7 @@ type
Backend* = ref object
# Code generators
implementMsg*: proc (p: P2PProtocol, msg: Message, resp: Message = nil)
implementMsg*: proc (p: P2PProtocol, msg: Message)
implementProtocolInit*: proc (p: P2PProtocol): NimNode
afterProtocolInit*: proc (p: P2PProtocol)
@ -74,6 +99,7 @@ type
PeerType*: NimNode
NetworkType*: NimNode
SerializationFormat*: NimNode
ResponseType*: NimNode
registerProtocol*: NimNode
setEventHandlers*: NimNode
@ -83,8 +109,6 @@ type
P2PBackendError* = object of CatchableError
InvalidMsgError* = object of P2PBackendError
ProtocolInfoBase* = object
const
defaultReqTimeout = 10.seconds
@ -267,8 +291,9 @@ proc ensureTimeoutParam(procDef: NimNode, timeouts: int64): NimNode =
Duration,
newCall(milliseconds, newLit(timeouts)))
proc newMsg(p: P2PProtocol, kind: MessageKind, id: int,
procDef: NimNode, timeoutParam: NimNode = nil): Message =
proc newMsg(protocol: P2PProtocol, kind: MessageKind, id: int,
procDef: NimNode, timeoutParam: NimNode = nil,
response: Message = nil): Message =
if procDef[0].kind == nnkPostfix:
error("p2pProcotol procs are public by default. " &
@ -288,51 +313,89 @@ proc newMsg(p: P2PProtocol, kind: MessageKind, id: int,
# are automatically remapped
newEmptyNode())
result = Message(id: id, ident: msgIdent, kind: kind,
result = Message(protocol: protocol, id: id, ident: msgIdent, kind: kind,
procDef: procDef, recIdent: recName, recBody: recBody,
timeoutParam: timeoutParam)
timeoutParam: timeoutParam, response: response)
if procDef.body.kind != nnkEmpty:
result.userHandler = copy procDef
p.augmentUserHandler result.userHandler
protocol.augmentUserHandler result.userHandler
result.userHandler.name = genSym(nskProc, msgName)
p.messages.add result
protocol.messages.add result
proc identWithExportMarker*(msg: Message): NimNode =
newTree(nnkPostfix, ident("*"), msg.ident)
proc createSendProc*(msg: Message, procType = nnkProcDef): NimNode =
proc createSendProc*(msg: Message,
procType = nnkProcDef,
isRawSender = false): SendProc =
# TODO: file an issue:
# macros.newProc and macros.params doesn't work with nnkMacroDef
# createSendProc must be called only once
assert msg.sendProcPeerParam == nil
let
name = if not isRawSender: msg.identWithExportMarker
else: genSym(nskProc, $msg.ident & "RawSender")
pragmas = if procType == nnkProcDef: newTree(nnkPragma, ident"gcsafe")
else: newEmptyNode()
result = newNimNode(procType).add(
msg.identWithExportMarker, ## name
var def = newNimNode(procType).add(
name,
newEmptyNode(),
newEmptyNode(),
copy msg.procDef.params, ## params
copy msg.procDef.params,
pragmas,
newEmptyNode(),
newStmtList()) ## body
for param, paramType in result.typedParams():
if msg.sendProcPeerParam == nil:
msg.sendProcPeerParam = param
result.msg = msg
result.def = def
result.allDefs = newStmtList(def)
for param, paramType in def.typedParams():
if result.peerParam.isNil:
result.peerParam = param
else:
msg.sendProcMsgParams.add param
result.msgParams.add param
if msg.kind in {msgHandshake, msgRequest}:
result[3].add msg.timeoutParam
case msg.kind
of msgHandshake, msgRequest:
# Add a timeout parameter for all request procs
let timeout = copy msg.timeoutParam
def[3].add timeout
result.timeoutParam = timeout[0]
result[3][0] = if procType == nnkMacroDef: ident "untyped"
else: newTree(nnkBracketExpr, ident("Future"), msg.recIdent)
of msgResponse:
# A response proc must be called with a response object that originates
# from a certain request. Here we change the Peer parameter at position
# 1 to the correct strongly-typed ResponseType. The incoming procs still
# gets the normal Peer paramter.
let
ResponseType = msg.protocol.backend.ResponseType
sendProcName = msg.ident
assert ResponseType != nil
def[3][1][1] = newTree(nnkBracketExpr, ResponseType, msg.recIdent)
# We create a helper that enables the `response.send()` syntax
# inside the user handler of the request proc:
result.allDefs.add quote do:
template send*(r: `ResponseType`, args: varargs[untyped]): auto =
`sendProcName`(r, args)
of msgNotification:
discard
def[3][0] = if procType == nnkMacroDef:
ident "untyped"
elif msg.kind == msgRequest and not isRawSender:
newTree(nnkBracketExpr, ident("Future"), msg.response.recIdent)
elif msg.kind == msgHandshake and not isRawSender:
newTree(nnkBracketExpr, ident("Future"), msg.recIdent)
else:
newTree(nnkBracketExpr, ident("Future"), ident"void")
const tracingEnabled = defined(p2pdump)
@ -360,10 +423,11 @@ when tracingEnabled:
proc initFuture*[T](loc: var Future[T]) =
loc = newFuture[T]()
proc createSendProcBody*(msg: Message,
proc implementBody*(sendProc: SendProc,
preludeGenerator: proc(stream: NimNode): NimNode,
sendCallGenerator: proc (peer, bytes: NimNode): NimNode): NimNode =
sendCallGenerator: proc (peer, bytes: NimNode): NimNode) =
let
msg = sendProc.msg
outputStream = ident "outputStream"
msgBytes = ident "msgBytes"
writer = ident "writer"
@ -371,7 +435,7 @@ proc createSendProcBody*(msg: Message,
resultIdent = ident "result"
initFuture = bindSym "initFuture"
recipient = msg.sendProcPeerParam
recipient = sendProc.peerParam
msgRecName = msg.recIdent
Format = msg.protocol.backend.SerializationFormat
@ -387,14 +451,13 @@ proc createSendProcBody*(msg: Message,
tracing = when tracingEnabled: logSentMsgFields(recipient,
newLit(msg.protocol.name),
$msg.ident,
msg.sendProcMsgParams)
sendProc.msgParams)
else: newStmtList()
for param in msg.sendProcMsgParams:
for param in sendProc.msgParams:
appendParams.add newCall(writeField, writer, newLit($param), param)
result = quote do:
sendProc.def.body = quote do:
mixin init, WriterType, beginRecord, endRecord, getOutput
`initResultFuture`
@ -408,6 +471,20 @@ proc createSendProcBody*(msg: Message,
let `msgBytes` = getOutput(`outputStream`)
`sendCall`
proc genAwaitUserHandler*(msg: Message, receivedMsg: NimNode,
leadingParams: varargs[NimNode]): NimNode =
if msg.userHandler == nil:
return newStmtList()
var userHandlerCall = newCall(msg.userHandler.name, leadingParams)
for param, paramType in msg.procDef.typedParams(skip = 1):
# If there is user message handler, we'll place a call to it by
# unpacking the fields of the received message:
userHandlerCall.add newDotExpr(receivedMsg, param)
return newCall("await", userHandlerCall)
proc appendAllParams*(node: NimNode, procDef: NimNode, skipFirst = 0): NimNode =
result = node
for p, _ in procDef.typedParams(skip = skipFirst):
@ -465,13 +542,14 @@ proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode) =
error "requestResponse expects a block with at least two proc definitions"
var queries = newSeq[Message]()
let responseMsg = p.newMsg(msgResponse, nextId + procs.len - 1, procs[^1])
for i in 0 .. procs.len - 2:
var timeout = ensureTimeoutParam(procs[i], p.timeouts)
queries.add p.newMsg(msgRequest, nextId + i, procs[i], timeout)
queries.add p.newMsg(msgRequest, nextId + i, procs[i], timeout,
response = responseMsg)
p.requests.add Request(
queries: queries,
response: p.newMsg(msgResponse, nextId + procs.len - 1, procs[^1]))
p.requests.add Request(queries: queries, response: responseMsg)
inc nextId, procs.len
@ -556,7 +634,7 @@ proc genCode*(p: P2PProtocol): NimNode =
for req in p.requests:
p.backend.implementMsg p, req.response
for query in req.queries: p.backend.implementMsg(p, query, req.response)
for query in req.queries: p.backend.implementMsg(p, query)
result = newStmtList()
result.add p.genTypeSection()

View File

@ -16,6 +16,7 @@ logScope:
const
devp2pVersion* = 4
maxMsgSize = 1024 * 1024
HandshakeTimeout = BreachOfProtocol
include p2p_tracing
@ -535,10 +536,10 @@ proc dispatchMessages*(peer: Peer) {.async.} =
template applyDecorator(p: NimNode, decorator: NimNode) =
if decorator.kind != nnkNilLit: p.addPragma decorator
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
let
resultIdent = ident "result"
isSubprotocol = p.version > 0
isSubprotocol = protocol.version > 0
Option = bindSym "Option"
# XXX: Binding the int type causes instantiation failure for some reason
# Int = bindSym "int"
@ -571,7 +572,9 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
perPeerMsgId = bindSym "perPeerMsgId"
perPeerMsgIdImpl = bindSym "perPeerMsgIdImpl"
linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture"
shortName = if p.shortName.len > 0: p.shortName else: p.name
handshakeImpl = bindSym "handshakeImpl"
shortName = if protocol.shortName.len > 0: protocol.shortName
else: protocol.name
# By convention, all Ethereum protocol names must be abbreviated to 3 letters
doAssert shortName.len == 3
@ -583,17 +586,17 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
result.PeerType = Peer
result.NetworkType = EthereumNode
result.implementMsg = proc (p: P2PProtocol, msg: Message, resp: Message = nil) =
result.implementMsg = proc (protocol: P2PProtocol, msg: Message) =
var
msgId = msg.id
msgRecName = msg.recIdent
msgKind = msg.kind
n = msg.procDef
responseMsgId = if resp != nil: resp.id else: -1
responseRecord = if resp != nil: resp.recIdent else: nil
responseMsgId = if msg.response != nil: msg.response.id else: -1
responseRecord = if msg.response != nil: msg.response.recIdent else: nil
msgIdent = n.name
msgName = $msgIdent
hasReqIds = p.useRequestIds and msgKind in {msgRequest, msgResponse}
hasReqIds = protocol.useRequestIds and msgKind in {msgRequest, msgResponse}
userPragmas = n.pragma
# variables used in the sending procs
@ -684,7 +687,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
# Above, by default `awaitUserHandler` is set to a no-op statement list.
awaitUserHandler = newCall("await", userHandlerCall)
p.outRecvProcs.add(msg.userHandler)
protocol.outRecvProcs.add msg.userHandler
for param, paramType in n.typedParams(skip = 1):
# This is a fragment of the sending proc that
@ -723,17 +726,17 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
for p in userPragmas: thunkProc.addPragma p
case msgKind
of msgRequest: thunkProc.applyDecorator p.incomingRequestThunkDecorator
of msgResponse: thunkProc.applyDecorator p.incomingResponseThunkDecorator
of msgRequest: thunkProc.applyDecorator protocol.incomingRequestThunkDecorator
of msgResponse: thunkProc.applyDecorator protocol.incomingResponseThunkDecorator
else: discard
thunkProc.addPragma ident"async"
p.outRecvProcs.add thunkProc
protocol.outRecvProcs.add thunkProc
var msgSendProc = n
let msgSendProcName = n.name
p.outSendProcs.add msgSendProc
protocol.outSendProcs.add msgSendProc
# TODO: check that the first param has the correct type
msgSendProc.params[1][0] = sendTo
@ -751,13 +754,13 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
# let rsp = bindSym "Response"
# let rspId = bindSym "ResponseWithId"
let
ResponseTypeHead = if p.useRequestIds: ResponseWithId
ResponseTypeHead = if protocol.useRequestIds: ResponseWithId
else: Response
ResponseType = newTree(nnkBracketExpr, ResponseTypeHead, msgRecName)
msgSendProc.params[1][1] = ResponseType
p.outSendProcs.add quote do:
protocol.outSendProcs.add quote do:
template send*(r: `ResponseType`, args: varargs[untyped]): auto =
`msgSendProcName`(r, args)
else: discard
@ -787,7 +790,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
quote: return `sendCall`
let perPeerMsgIdValue = if isSubprotocol:
newCall(perPeerMsgIdImpl, msgRecipient, p.protocolInfoVar, newLit(msgId))
newCall(perPeerMsgIdImpl, msgRecipient, protocol.protocolInfoVar, newLit(msgId))
else:
newLit(msgId)
@ -798,8 +801,8 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
newCall(startList, rlpWriter, newLit(paramCount)),
appendParams)
for p in paramsToWrite:
appendParams.add newCall(append, rlpWriter, p)
for param in paramsToWrite:
appendParams.add newCall(append, rlpWriter, param)
if msgKind == msgHandshake:
var
@ -813,10 +816,9 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
var
forwardCall = newCall(rawSendProc).appendAllParams(handshakeExchanger)
peerVariable = ident"peer"
peerVariable = ident "peer"
peerValue = forwardCall[1]
timeoutValue = msg.timeoutParam[0]
handshakeImpl = ident"handshakeImpl"
forwardCall[1] = peerVariable
forwardCall.del(forwardCall.len - 1)
@ -830,7 +832,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
`timeoutValue`)
msgSendProc.name = rawSendProc
p.outSendProcs.add handshakeExchanger
protocol.outSendProcs.add handshakeExchanger
else:
# Make the send proc public
msgSendProc.name = msg.identWithExportMarker
@ -853,11 +855,11 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
`senderEpilogue`
if msgKind == msgRequest:
msgSendProc.applyDecorator p.outgoingRequestDecorator
msgSendProc.applyDecorator protocol.outgoingRequestDecorator
p.outProcRegistrations.add(
protocol.outProcRegistrations.add(
newCall(registerMsg,
p.protocolInfoVar,
protocol.protocolInfoVar,
newIntLitNode(msgId),
newStrLitNode($n.name),
thunkName,
@ -865,11 +867,11 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
newTree(nnkBracketExpr, requestResolver, msgRecName),
newTree(nnkBracketExpr, nextMsgResolver, msgRecName)))
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
result.implementProtocolInit = proc (protocol: P2PProtocol): NimNode =
return newCall(initProtocol,
newLit(p.shortName),
newLit(p.version),
p.peerInit, p.netInit)
newLit(protocol.shortName),
newLit(protocol.version),
protocol.peerInit, protocol.netInit)
p2pProtocol devp2p(version = 0, shortName = "p2p"):
proc hello(peer: Peer,
@ -912,24 +914,6 @@ proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): Future[voi
return all(futures)
proc handshakeImpl*[T](peer: Peer,
sendFut: Future[void],
responseFut: Future[T],
timeout: Duration): Future[T] {.async.} =
sendFut.addCallback do (arg: pointer) {.gcsafe.}:
if sendFut.failed:
debug "Handshake message not delivered", peer
doAssert timeout.milliseconds > 0
yield responseFut or sleepAsync(timeout)
if not responseFut.finished:
discard disconnectAndRaise(peer, BreachOfProtocol,
"Protocol handshake was not received in time.")
elif responseFut.failed:
raise responseFut.error
else:
return responseFut.read
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting