# beacon_chain # Copyright (c) 2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [].} import std/[sequtils], results, stew/shims/macros, chronos, faststreams/outputs export chronos, results type MessageKind* = enum msgHandshake msgNotification msgRequest msgResponse Message* = ref object id*: int ident*: NimNode kind*: MessageKind procDef*: NimNode timeoutParam*: NimNode recName*: NimNode strongRecName*: NimNode recBody*: NimNode protocol*: P2PProtocol response*: Message userHandler*: NimNode initResponderCall*: NimNode outputParamDef*: NimNode Request* = ref object queries*: seq[Message] response*: Message SendProc* = object ## A `SendProc` is a proc used to send a single P2P message. ## If it's a Request, then the return type will be a Future ## of the respective Response type. All send procs also have ## an automatically inserted `timeout` parameter. msg*: Message ## The message being implemented def*: NimNode ## The definition of the proc peerParam*: NimNode ## Cached ident for the peer param msgParams*: seq[NimNode] ## Cached param ident for all values that must be written ## on the wire. The automatically inserted `timeout` is not ## included. timeoutParam*: NimNode ## Cached ident for the timeout parameter extraDefs*: NimNode ## The response procs have extra templates that must become ## part of the generated code P2PProtocol* = ref object # Settings name*: string version*: int timeouts*: int64 outgoingRequestDecorator*: NimNode incomingRequestDecorator*: NimNode incomingRequestThunkDecorator*: NimNode incomingResponseDecorator*: NimNode incomingResponseThunkDecorator*: NimNode PeerStateType*: NimNode NetworkStateType*: NimNode backend*: Backend # Cached properties nameIdent*: NimNode protocolInfoVar*: NimNode # All messages messages*: seq[Message] # Messages by type: handshake*: Message notifications*: seq[Message] requests*: seq[Request] # Output procs outSendProcs*: NimNode outRecvProcs*: NimNode outProcRegistrations*: NimNode # Event handlers onPeerConnected*: NimNode onPeerDisconnected*: NimNode Backend* = ref object # Code generators implementMsg*: proc (msg: Message) implementProtocolInit*: proc (protocol: P2PProtocol): NimNode afterProtocolInit*: proc (protocol: P2PProtocol) # Bound symbols to the back-end run-time types and procs PeerType*: NimNode NetworkType*: NimNode SerializationFormat*: NimNode ResponderType*: NimNode RequestResultsWrapper*: NimNode registerProtocol*: NimNode setEventHandlers*: NimNode BackendFactory* = proc (p: P2PProtocol): Backend P2PBackendError* = object of CatchableError InvalidMsgError* = object of P2PBackendError const defaultReqTimeout = 10.seconds tracingEnabled = defined(p2pdump) let # Variable names affecting the public interface of the library: peerVar* {.compileTime.} = ident "peer" responseVar* {.compileTime.} = ident "response" streamVar* {.compileTime.} = ident "stream" protocolVar* {.compileTime.} = ident "protocol" deadlineVar* {.compileTime.} = ident "deadline" timeoutVar* {.compileTime.} = ident "timeout" currentProtocolSym* {.compileTime.} = ident "CurrentProtocol" resultIdent* {.compileTime.} = ident "result" # Locally used symbols: Opt {.compileTime.} = ident "Opt" Future {.compileTime.} = ident "Future" Void {.compileTime.} = ident "void" writeField {.compileTime.} = ident "writeField" PROTO {.compileTime.} = ident "PROTO" MSG {.compileTime.} = ident "MSG" template Fut(T): auto = newTree(nnkBracketExpr, Future, T) proc initFuture*[T](loc: var Future[T]) = loc = newFuture[T]() template applyDecorator(p: NimNode, decorator: NimNode) = if decorator.kind != nnkNilLit: p.pragma.insert(0, decorator) when tracingEnabled: proc logSentMsgFields(peer: NimNode, protocolInfo: NimNode, msgName: string, fields: openArray[NimNode]): NimNode = ## This generates the tracing code inserted in the message sending procs ## `fields` contains all the params that were serialized in the message let tracer = ident "tracer" tracerStream = ident "tracerStream" logMsgEventImpl = ident "logMsgEventImpl" result = quote do: var `tracerStream` = memoryOutput() var `tracer` = JsonWriter.init(`tracerStream`) beginRecord(`tracer`) for f in fields: result.add newCall(writeField, tracer, newLit($f), f) result.add quote do: endRecord(`tracer`) `logMsgEventImpl`("outgoing_msg", `peer`, `protocolInfo`, `msgName`, getOutput(`tracerStream`, string)) proc createPeerState[Peer, ProtocolState](peer: Peer): RootRef = var res = new ProtocolState mixin initProtocolState initProtocolState(res, peer) return cast[RootRef](res) proc expectBlockWithProcs*(n: NimNode): seq[NimNode] = template helperName: auto = $n[0] if n.len != 2 or n[1].kind != nnkStmtList: error(helperName & " expects a block", n) for p in n[1]: if p.kind == nnkProcDef: result.add p elif p.kind == nnkCommentStmt: continue else: error(helperName & " expects a proc definition.", p) proc nameOrNil*(procDef: NimNode): NimNode = if procDef != nil: procDef.name else: newNilLit() proc isOutputParamName(paramName: NimNode): bool = eqIdent(paramName, "output") or eqIdent(paramName, "response") proc isOutputParam(param: NimNode): bool = param.len > 0 and param[0].skipPragma.isOutputParamName proc getOutputParam(procDef: NimNode): NimNode = let params = procDef.params for i in countdown(params.len - 1, 1): let param = params[i] if isOutputParam(param): return param proc outputParam*(msg: Message): NimNode = case msg.kind of msgRequest: outputParam(msg.response) of msgResponse: msg.outputParamDef else: raiseAssert "Only requests (and the attached responses) can have output parameters" proc outputParamIdent*(msg: Message): NimNode = let outputParam = msg.outputParam if outputParam != nil: return outputParam[0].skipPragma proc outputParamType*(msg: Message): NimNode = let outputParam = msg.outputParam if outputParam != nil: return outputParam[1] proc refreshParam(n: NimNode): NimNode = result = copyNimTree(n) if n.kind == nnkIdentDefs: for i in 0.. 1: result = quote do: mixin init, writerType, beginRecord, endRecord var `writer` = init(WriterType(`Format`), `outputStream`) var `recordWriterCtx` = beginRecord(`writer`, `RecordType`) `appendParams` endRecord(`writer`, `recordWriterCtx`) else: let param = params[0] result = quote do: var `writer` = init(WriterType(`Format`), `outputStream`) writeValue(`writer`, `param`) proc useStandardBody*(sendProc: SendProc, preSerializationStep: proc(stream: NimNode): NimNode, postSerializationStep: proc(stream: NimNode): NimNode, sendCallGenerator: proc (peer, bytes: NimNode): NimNode) = let msg = sendProc.msg msgBytes = ident "msgBytes" recipient = sendProc.peerParam sendCall = sendCallGenerator(recipient, msgBytes) if sendProc.msgParams.len == 0: sendProc.setBody quote do: var `msgBytes`: seq[byte] `sendCall` return let outputStream = ident "outputStream" msgRecName = msg.recName Format = msg.protocol.backend.SerializationFormat preSerialization = if preSerializationStep.isNil: newStmtList() else: preSerializationStep(outputStream) serialization = writeParamsAsRecord(sendProc.msgParams, outputStream, Format, msgRecName) postSerialization = if postSerializationStep.isNil: newStmtList() else: postSerializationStep(outputStream) tracing = when not tracingEnabled: newStmtList() else: logSentMsgFields(recipient, msg.protocol.protocolInfoVar, $msg.ident, sendProc.msgParams) sendProc.setBody quote do: mixin init, WriterType, beginRecord, endRecord, getOutput let `msgBytes` = try: var `outputStream` = memoryOutput() `preSerialization` `serialization` `postSerialization` `tracing` getOutput(`outputStream`) except IOError: raiseAssert "memoryOutput doesn't raise IOError actually" `sendCall` proc correctSerializerProcParams(params: NimNode) = # A serializer proc is just like a send proc, but: # 1. it has a void return type params[0] = ident "void" # 2. The peer params is replaced with OutputStream params[1] = newIdentDefs(streamVar, bindSym "OutputStream") # 3. The timeout param is removed params.del(params.len - 1) proc createSerializer*(msg: Message, procType = nnkProcDef): NimNode = var serializer = msg.createSendProc(procType, nameSuffix = "Serializer") correctSerializerProcParams serializer.def.params serializer.setBody writeParamsAsRecord( serializer.msgParams, streamVar, msg.protocol.backend.SerializationFormat, msg.recName) return serializer.def proc defineThunk*(msg: Message, thunk: NimNode) = let protocol = msg.protocol case msg.kind of msgRequest: thunk.applyDecorator protocol.incomingRequestThunkDecorator of msgResponse: thunk.applyDecorator protocol.incomingResponseThunkDecorator else: discard protocol.outRecvProcs.add thunk proc genUserHandlerCall*(msg: Message, receivedMsg: NimNode, leadingParams: openArray[NimNode], outputParam: NimNode = nil): NimNode = if msg.userHandler == nil: return newStmtList() result = newCall(msg.userHandler.name, leadingParams) if msg.needsSingleParamInlining: result.add receivedMsg else: var params = toSeq(msg.procDef.typedInputParams(skip = 1)) for p in params: result.add newDotExpr(receivedMsg, p[0]) if outputParam != nil: result.add outputParam proc genAwaitUserHandler*(msg: Message, receivedMsg: NimNode, leadingParams: openArray[NimNode], outputParam: NimNode = nil): NimNode = result = msg.genUserHandlerCall(receivedMsg, leadingParams, outputParam) if result.len > 0: result = newCall("await", result) proc appendAllInputParams*(node: NimNode, procDef: NimNode): NimNode = result = node for p, _ in procDef.typedInputParams(): result.add p proc paramNames*(procDef: NimNode, skipFirst = 0): seq[NimNode] = result = newSeq[NimNode]() for name, _ in procDef.typedParams(skip = skipFirst): result.add name proc netInit*(p: P2PProtocol): NimNode = newNilLit() # if p.NetworkStateType == nil: # newNilLit() # else: # newTree(nnkBracketExpr, bindSym"createNetworkState", # p.backend.NetworkType, # p.NetworkStateType) proc createHandshakeTemplate*(msg: Message, rawSendProc, handshakeImpl, nextMsg: NimNode): SendProc = let handshakeExchanger = msg.createSendProc(procType = nnkTemplateDef) forwardCall = newCall(rawSendProc).appendAllInputParams(handshakeExchanger.def) peerValue = forwardCall[1] msgRecName = msg.recName forwardCall[1] = peerVar forwardCall.del(forwardCall.len - 1) let peerVar = genSym(nskLet ,"peer") handshakeExchanger.setBody quote do: let `peerVar` = `peerValue` let sendingFuture = `forwardCall` `handshakeImpl`(`peerVar`, sendingFuture, `nextMsg`(`peerVar`, `msgRecName`), `timeoutVar`) return handshakeExchanger proc peerInit*(p: P2PProtocol): NimNode = if p.PeerStateType == nil: newNilLit() else: newTree(nnkBracketExpr, bindSym"createPeerState", p.backend.PeerType, p.PeerStateType) proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode) = ## This procs handles all DSL statements valid inside a p2pProtocol. ## ## It will populate the protocol's fields such as: ## * handshake ## * requests ## * notifications ## * onPeerConnected ## * onPeerDisconnected ## ## All messages will have properly computed numeric IDs ## for n in protocolBody: case n.kind of {nnkCall, nnkCommand}: if eqIdent(n[0], "requestResponse"): # `requestResponse` can be given a block of 2 or more procs. # The last one is considered to be a response message, while # all preceding ones are requests triggering the response. # The system makes sure to automatically insert a hidden `reqId` # parameter used to discriminate the individual messages. let procs = expectBlockWithProcs(n) if procs.len < 2: error "requestResponse expects a block with at least two proc definitions" var queries = newSeq[Message]() let responseMsg = p.newMsg(msgResponse, procs[^1]) for i in 0 .. procs.len - 2: queries.add p.newMsg(msgRequest, procs[i], response = responseMsg) p.requests.add Request(queries: queries, response: responseMsg) elif eqIdent(n[0], "handshake"): let procs = expectBlockWithProcs(n) if procs.len != 1: error "handshake expects a block with a single proc definition", n if p.handshake != nil: error "The handshake for the protocol is already defined", n p.handshake = p.newMsg(msgHandshake, procs[0]) elif eqIdent(n[0], "onPeerConnected"): p.onPeerConnected = p.eventHandlerToProc(n[1], "PeerConnected") elif eqIdent(n[0], "onPeerDisconnected"): p.onPeerDisconnected = p.eventHandlerToProc(n[1], "PeerDisconnected") else: error(repr(n) & " is not a recognized call in P2P protocol definitions", n) of nnkProcDef, nnkIteratorDef: p.addMsg(n) of nnkCommentStmt: discard else: error "Illegal syntax in a P2P protocol definition", n proc genTypeSection*(p: P2PProtocol): NimNode = var protocolName = p.nameIdent peerState = p.PeerStateType networkState= p.NetworkStateType result = newStmtList() result.add quote do: # Create a type acting as a pseudo-object representing the protocol # (e.g. p2p) type `protocolName`* = object if peerState != nil: result.add quote do: template State*(`PROTO`: type `protocolName`): type = `peerState` if networkState != nil: result.add quote do: template NetworkState*(`PROTO`: type `protocolName`): type = `networkState` for msg in p.messages: if msg.procDef == nil: continue let msgName = msg.ident msgRecName = msg.recName msgStrongRecName = msg.strongRecName msgRecBody = msg.recBody result.add quote do: # This is a type featuring a single field for each message param: type `msgStrongRecName`* = `msgRecBody` # Add a helper template for accessing the message type: # e.g. p2p.hello: template `msgName`*(`PROTO`: type `protocolName`): type = `msgRecName` # Add a helper template for obtaining the message Id for # a particular message type: template msgProtocol*(`MSG`: type `msgStrongRecName`): type = `protocolName` template RecType*(`MSG`: type `msgStrongRecName`): untyped = `msgRecName` proc genCode*(p: P2PProtocol): NimNode = for msg in p.messages: p.backend.implementMsg msg result = newStmtList() result.add p.genTypeSection() let protocolInfoVar = p.protocolInfoVar protocolInfoVarObj = ident($protocolInfoVar & "Obj") protocolName = p.nameIdent protocolInit = p.backend.implementProtocolInit(p) result.add quote do: # One global variable per protocol holds the protocol run-time data var `protocolInfoVarObj` = `protocolInit` let `protocolInfoVar` = addr `protocolInfoVarObj` # The protocol run-time data is available as a pseudo-field # (e.g. `p2p.protocolInfo`) template protocolInfo*(`PROTO`: type `protocolName`): auto = `protocolInfoVar` result.add p.outSendProcs, p.outRecvProcs, p.outProcRegistrations if p.onPeerConnected != nil: result.add p.onPeerConnected if p.onPeerDisconnected != nil: result.add p.onPeerDisconnected result.add newCall(p.backend.setEventHandlers, protocolInfoVar, nameOrNil p.onPeerConnected, nameOrNil p.onPeerDisconnected) macro emitForSingleBackend( name: static[string], version: static[int], backend: static[BackendFactory], body: untyped, # TODO Nim can't handle a proper duration parameter here timeouts: static[int64] = defaultReqTimeout.milliseconds, outgoingRequestDecorator: untyped = nil, incomingRequestDecorator: untyped = nil, incomingRequestThunkDecorator: untyped = nil, incomingResponseDecorator: untyped = nil, incomingResponseThunkDecorator: untyped = nil, peerState = type(nil), networkState = type(nil)): untyped = var p = P2PProtocol.init( backend, name, version, body, timeouts, outgoingRequestDecorator, incomingRequestDecorator, incomingRequestThunkDecorator, incomingResponseDecorator, incomingResponseThunkDecorator, peerState.getType, networkState.getType) result = p.genCode() try: result.storeMacroResult true except IOError: # IO error so the generated nim code might not be stored, don't sweat it. discard macro emitForAllBackends(backendSyms: typed, options: untyped, body: untyped): untyped = let name = $(options[0]) var backends = newSeq[NimNode]() if backendSyms.kind == nnkSym: backends.add backendSyms else: for backend in backendSyms: backends.add backend result = newStmtList() for backend in backends: let call = copy options call[0] = bindSym"emitForSingleBackend" call.add newTree(nnkExprEqExpr, ident("name"), newLit(name)) call.add newTree(nnkExprEqExpr, ident("backend"), backend) call.add newTree(nnkExprEqExpr, ident("body"), body) result.add call template p2pProtocol*(options: untyped, body: untyped) {.dirty.} = bind emitForAllBackends emitForAllBackends(p2pProtocolBackendImpl, options, body)