More shared code extracted out of RLPx

This commit is contained in:
Zahary Karadjov 2019-05-29 11:16:59 +03:00
parent f761889518
commit 76bb2cde5c
No known key found for this signature in database
GPG Key ID: C8936F8A3073D609
5 changed files with 148 additions and 84 deletions

View File

@ -23,6 +23,48 @@ template networkState*(connection: Peer, Protocol: type): untyped =
proc initProtocolState*[T](state: T, x: Peer|EthereumNode) {.gcsafe.} = discard
proc initFuture[T](loc: var Future[T]) =
loc = newFuture[T]()
proc requestResolver[MsgType](msg: pointer, future: FutureBase) {.gcsafe.} =
var f = Future[Option[MsgType]](future)
if not f.finished:
if msg != nil:
f.complete some(cast[ptr MsgType](msg)[])
else:
f.complete none(MsgType)
else:
# This future was already resolved, but let's do some sanity checks
# here. The only reasonable explanation is that the request should
# have timed out.
if msg != nil:
if f.read.isSome:
doAssert false, "trying to resolve a request twice"
else:
doAssert false, "trying to resolve a timed out request with a value"
else:
try:
if not f.read.isSome:
doAssert false, "a request timed out twice"
# This can except when the future still completes with an error.
# E.g. the `sendMsg` fails because of an already closed transport or a
# broken pipe
except TransportOsError:
# E.g. broken pipe
trace "TransportOsError during request", err = getCurrentExceptionMsg()
except TransportError:
trace "Transport got closed during request"
except:
debug "Exception in requestResolver()",
exc = getCurrentException().name,
err = getCurrentExceptionMsg()
raise
proc linkSendFailureToReqFuture[S, R](sendFut: Future[S], resFut: Future[R]) =
sendFut.addCallback() do (arg: pointer):
if not sendFut.error.isNil:
resFut.fail(sendFut.error)
proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} =
result = ""
# TODO: uncommenting the line below increases the compile-time
# tremendously (for reasons not yet known)
# result = $(cast[ptr MsgType](msg)[])

View File

@ -1,6 +1,6 @@
import
macros,
std_shims/macros_shim, chronos/timer
std_shims/macros_shim, chronos
type
MessageKind* = enum
@ -18,6 +18,10 @@ type
recIdent*: NimNode
recBody*: NimNode
userHandler*: NimNode
protocol*: P2PProtocol
sendProcPeerParam*: NimNode
sendProcMsgParams*: seq[NimNode]
Request* = ref object
queries*: seq[Message]
@ -69,12 +73,18 @@ type
# Bound symbols to the back-end run-time types and procs
PeerType*: NimNode
NetworkType*: NimNode
SerializationFormat*: NimNode
registerProtocol*: NimNode
setEventHandlers*: NimNode
BackendFactory* = proc (p: P2PProtocol): Backend
P2PBackendError* = object of CatchableError
InvalidMsgError* = object of P2PBackendError
ProtocolInfoBase* = object
const
defaultReqTimeout = 10.seconds
@ -296,24 +306,108 @@ proc createSendProc*(msg: Message, procType = nnkProcDef): NimNode =
# TODO: file an issue:
# macros.newProc and macros.params doesn't work with nnkMacroDef
let pragmas = if procType == nnkProcDef: newTree(nnkPragma, ident"gcsafe")
# createSendProc must be called only once
assert msg.sendProcPeerParam == nil
let
pragmas = if procType == nnkProcDef: newTree(nnkPragma, ident"gcsafe")
else: newEmptyNode()
result = newNimNode(procType).add(
msg.identWithExportMarker, ## name
newEmptyNode(),
newEmptyNode(),
msg.procDef.params.copy, ## params
copy msg.procDef.params, ## params
pragmas,
newEmptyNode(),
newStmtList()) ## body
for param, paramType in result.typedParams():
if msg.sendProcPeerParam == nil:
msg.sendProcPeerParam = param
else:
msg.sendProcMsgParams.add param
if msg.kind in {msgHandshake, msgRequest}:
result[3].add msg.timeoutParam
result[3][0] = if procType == nnkMacroDef: ident "untyped"
else: newTree(nnkBracketExpr, ident("Future"), msg.recIdent)
const tracingEnabled = defined(p2pdump)
when tracingEnabled:
proc logSentMsgFields(peer: NimNode,
protocolInfo: NimNode,
msgName: string,
fields: openarray[NimNode]): NimNode =
## This generates the tracing code inserted in the message sending procs
## `fields` contains all the params that were serialized in the message
var tracer = ident("tracer")
result = quote do:
var `tracer` = init StringJsonWriter
beginRecord(`tracer`)
for f in fields:
result.add newCall(bindSym"writeField", tracer, newLit($f), f)
result.add quote do:
endRecord(`tracer`)
logMsgEventImpl("outgoing_msg", `peer`,
`protocolInfo`, `msgName`, getOutput(`tracer`))
proc initFuture*[T](loc: var Future[T]) =
loc = newFuture[T]()
proc createSendProcBody*(msg: Message,
preludeGenerator: proc(stream: NimNode): NimNode,
sendCallGenerator: proc (peer, bytes: NimNode): NimNode): NimNode =
let
outputStream = ident "outputStream"
msgBytes = ident "msgBytes"
writer = ident "writer"
writeField = ident "writeField"
resultIdent = ident "result"
initFuture = bindSym "initFuture"
recipient = msg.sendProcPeerParam
msgRecName = msg.recIdent
Format = msg.protocol.backend.SerializationFormat
prelude = if preludeGenerator.isNil: newStmtList()
else: preludeGenerator(outputStream)
appendParams = newStmtList()
initResultFuture = if msg.kind != msgRequest: newStmtList()
else: newCall(initFuture, resultIdent)
sendCall = sendCallGenerator(recipient, msgBytes)
tracing = when tracingEnabled: logSentMsgFields(recipient,
newLit(msg.protocol.name),
$msg.ident,
msg.sendProcMsgParams)
else: newStmtList()
for param in msg.sendProcMsgParams:
appendParams.add newCall(writeField, writer, newLit($param), param)
result = quote do:
mixin init, WriterType, beginRecord, endRecord, getOutput
`initResultFuture`
var `outputStream` = init OutputStream
`prelude`
var writer = init(WriterType(`Format`), `outputStream`)
var recordStartMemo = beginRecord(writer, `msgRecName`)
`appendParams`
`tracing`
endRecord(writer, recordStartMemo)
let `msgBytes` = getOutput(`outputStream`)
`sendCall`
proc appendAllParams*(node: NimNode, procDef: NimNode, skipFirst = 0): NimNode =
result = node
for p, _ in procDef.typedParams(skip = skipFirst):

View File

@ -62,26 +62,6 @@ when tracingEnabled:
Msg.type.name,
StringJsonWriter.encode(msg))
proc logSentMsgFields(peer: NimNode,
protocolInfo: NimNode,
msgName: string,
fields: openarray[NimNode]): NimNode =
## This generates the tracing code inserted in the message sending procs
## `fields` contains all the params that were serialized in the message
var tracer = ident("tracer")
result = quote do:
var `tracer` = init StringJsonWriter
beginRecord(`tracer`)
for f in fields:
result.add newCall(bindSym"writeField", tracer, newLit($f), f)
result.add quote do:
endRecord(`tracer`)
logMsgEventImpl("outgoing_msg", `peer`,
`protocolInfo`, `msgName`, getOutput(`tracer`))
template logSentMsg(peer: Peer, msg: auto) =
logMsgEvent("outgoing_msg", peer, msg)

View File

@ -101,7 +101,7 @@ type
name*: string
# Private fields:
thunk*: MessageHandler
thunk*: ThunkProc
printer*: MessageContentPrinter
requestResolver*: RequestResolver
nextMsgResolver*: NextMsgResolver
@ -134,7 +134,7 @@ type
# Private types:
MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode
MessageHandler* = proc(x: Peer, msgId: int, data: Rlp): Future[void] {.gcsafe.}
ThunkProc* = proc(x: Peer, msgId: int, data: Rlp): Future[void] {.gcsafe.}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
RequestResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.}
NextMsgResolver* = proc(msgData: Rlp, future: FutureBase) {.gcsafe.}

View File

@ -53,6 +53,8 @@ proc disconnectAndRaise(peer: Peer,
await peer.disconnect(r)
raisePeerDisconnected(msg, r)
include p2p_backends_helpers
# Dispatcher
#
@ -166,53 +168,13 @@ proc cmp*(lhs, rhs: ProtocolInfo): int =
return int16(lhs.name[i]) - int16(rhs.name[i])
return 0
proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} =
result = ""
# TODO: uncommenting the line below increases the compile-time
# tremendously (for reasons not yet known)
# result = $(cast[ptr MsgType](msg)[])
proc nextMsgResolver[MsgType](msgData: Rlp, future: FutureBase) {.gcsafe.} =
var reader = msgData
Future[MsgType](future).complete reader.readRecordType(MsgType, MsgType.rlpFieldsCount > 1)
proc requestResolver[MsgType](msg: pointer, future: FutureBase) {.gcsafe.} =
var f = Future[Option[MsgType]](future)
if not f.finished:
if msg != nil:
f.complete some(cast[ptr MsgType](msg)[])
else:
f.complete none(MsgType)
else:
# This future was already resolved, but let's do some sanity checks
# here. The only reasonable explanation is that the request should
# have timed out.
if msg != nil:
if f.read.isSome:
doAssert false, "trying to resolve a request twice"
else:
doAssert false, "trying to resolve a timed out request with a value"
else:
try:
if not f.read.isSome:
doAssert false, "a request timed out twice"
# This can except when the future still completes with an error.
# E.g. the `sendMsg` fails because of an already closed transport or a
# broken pipe
except TransportOsError:
# E.g. broken pipe
trace "TransportOsError during request", err = getCurrentExceptionMsg()
except TransportError:
trace "Transport got closed during request"
except:
debug "Exception in requestResolver()",
exc = getCurrentException().name,
err = getCurrentExceptionMsg()
raise
proc registerMsg(protocol: ProtocolInfo,
id: int, name: string,
thunk: MessageHandler,
thunk: ThunkProc,
printer: MessageContentPrinter,
requestResolver: RequestResolver,
nextMsgResolver: NextMsgResolver) =
@ -257,12 +219,6 @@ proc supports*(peer: Peer, Protocol: type): bool {.inline.} =
template perPeerMsgId(peer: Peer, MsgType: type): int =
perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId)
proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer,
rlpOut: var RlpWriter) =
let baseMsgId = peer.dispatcher.protocolOffsets[p.index]
doAssert baseMsgId != -1
rlpOut.append(baseMsgId + msgId)
proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] =
template invalidIdError: untyped =
raise newException(UnsupportedMessageError,
@ -275,11 +231,6 @@ proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] =
return thunk(peer, msgId, msgData)
proc linkSendFailureToReqFuture[S, R](sendFut: Future[S], resFut: Future[R]) =
sendFut.addCallback() do (arg: pointer):
if not sendFut.error.isNil:
resFut.fail(sendFut.error)
template compressMsg(peer: Peer, data: Bytes): Bytes =
when useSnappy:
if peer.snappyEnabled:
@ -499,8 +450,6 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} =
warn "Dropped RLPX message",
msg = peer.dispatcher.messages[nextMsgId].name
include p2p_backends_helpers
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
## This procs awaits a specific RLPx message.
## Any messages received while waiting will be dispatched to their
@ -619,7 +568,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
nextMsg = bindSym "nextMsg"
initProtocol = bindSym"initProtocol"
registerMsg = bindSym "registerMsg"
writeMsgId = bindSym "writeMsgId"
perPeerMsgId = bindSym "perPeerMsgId"
perPeerMsgIdImpl = bindSym "perPeerMsgIdImpl"
linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture"