diff --git a/beacon_chain/libp2p_backend.nim b/beacon_chain/libp2p_backend.nim index faf0a4358..96bd3552a 100644 --- a/beacon_chain/libp2p_backend.nim +++ b/beacon_chain/libp2p_backend.nim @@ -275,12 +275,6 @@ template getRecipient(stream: P2PStream): P2PStream = template getRecipient(response: Response): Peer = UntypedResponse(response).peer -proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} = - result = "" - # TODO: uncommenting the line below increases the compile-time - # tremendously (for reasons not yet known) - # result = $(cast[ptr MsgType](msg)[]) - proc initProtocol(name: string, peerInit: PeerStateInitializer, networkInit: NetworkStateInitializer): ProtocolInfoObj = @@ -374,6 +368,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = result.registerProtocol = bindSym "registerProtocol" result.setEventHandlers = bindSym "setEventHandlers" result.SerializationFormat = Format + result.ResponseType = Response result.afterProtocolInit = proc (p: P2PProtocol) = p.onPeerConnected.params.add newIdentDefs(ident"handshakeStream", P2PStream) @@ -501,13 +496,13 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = rawSendProc = msgName & "RawSend" handshakeTypeName = $msgRecName handshakeExchanger = msg.createSendProc(nnkMacroDef) - paramsArray = newTree(nnkBracket).appendAllParams(handshakeExchanger) + paramsArray = newTree(nnkBracket).appendAllParams(handshakeExchanger.def) bindSym = ident "bindSym" getAst = ident "getAst" handshakeImpl = ident "handshakeImpl" # TODO: macros.body triggers an assertion error when the proc type is nnkMacroDef - handshakeExchanger[6] = quote do: + handshakeExchanger.def[6] = quote do: let stream = ident"handshakeStream" rawSendProc = `bindSymOp` `rawSendProc` @@ -521,7 +516,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = return `getAst`(`handshakeImpl`(`msgRecName`, peer, stream, lazySendCall, timeout)) - p.outSendProcs.add handshakeExchanger + p.outSendProcs.add handshakeExchanger.def msgSendProc.params[1][1] = P2PStream msgSendProc.name = ident rawSendProc diff --git a/beacon_chain/libp2p_spec_backend.nim b/beacon_chain/libp2p_spec_backend.nim index db406678f..2cb99ce4b 100644 --- a/beacon_chain/libp2p_spec_backend.nim +++ b/beacon_chain/libp2p_spec_backend.nim @@ -7,7 +7,7 @@ import const # Compression nibble NoCompression* = uint 0 - + # Encoding nibble SszEncoding* = uint 1 @@ -79,7 +79,7 @@ type printer*: MessageContentPrinter nextMsgResolver*: NextMsgResolver requestResolver*: RequestResolver - + ProtocolInfoObj* = object name*: string version*: int @@ -110,10 +110,10 @@ type PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.} NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe.} - + HandshakeStep* = proc(peer: Peer, handshakeStream: P2PStream): Future[void] {.gcsafe.} DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.} - + ThunkProc* = proc(peer: Peer, stream: P2PStream, reqId: uint64, @@ -127,17 +127,25 @@ type InvalidMsgIdError = object of InvalidMsgError + PeerDisconnected* = object of P2PBackendError + reason*: DisconnectionReason + var gProtocols: seq[ProtocolInfo] -include eth/p2p/p2p_backends_helpers -include eth/p2p/p2p_tracing +const + HandshakeTimeout = FaultOrError + # TODO: this doesn't seem right. + # We should lobby for more disconnection reasons. proc `$`*(peer: Peer): string = $peer.id -proc readFixedSizeStruct(stream: P2PStream, T: type): Future[T] {.async.} = +proc readPackedObject(stream: P2PStream, T: type): Future[T] {.async.} = await stream.transp.readExactly(addr result, sizeof result) +proc appendPackedObject*(stream: ByteStreamVar, value: auto) = + stream.append makeOpenArray(unsafeAddr value, sizeof value) + type PeerLoopExitReason = enum Success @@ -147,6 +155,29 @@ type InactivePeer InternalError +proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = + # TODO: How should we notify the other peer? + if peer.connectionState notin {Disconnecting, Disconnected}: + peer.connectionState = Disconnecting + await peer.network.daemon.disconnect(peer.id) + peer.connectionState = Disconnected + peer.network.peers.del(peer.id) + +template raisePeerDisconnected(msg: string, r: DisconnectionReason) = + var e = newException(PeerDisconnected, msg) + e.reason = r + raise e + +proc disconnectAndRaise(peer: Peer, + reason: DisconnectionReason, + msg: string) {.async.} = + let r = reason + await peer.disconnect(r) + raisePeerDisconnected(msg, r) + +include eth/p2p/p2p_backends_helpers +include eth/p2p/p2p_tracing + proc accepts(d: Dispatcher, methodId: uint16): bool = methodId.int < d.messages.len @@ -166,31 +197,23 @@ proc invokeThunk(peer: Peer, return thunk(peer, stream, reqId, msgContents) -proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = - # TODO: How should we notify the other peer? - if peer.connectionState notin {Disconnecting, Disconnected}: - peer.connectionState = Disconnecting - await peer.network.daemon.disconnect(peer.id) - peer.connectionState = Disconnected - peer.network.peers.del(peer.id) - proc recvAndDispatchMsg*(peer: Peer, protocol: ProtocolInfo, stream: P2PStream): Future[PeerLoopExitReason] {.async.} = template fail(reason) = return reason - var outerHeader = await stream.readFixedSizeStruct(SpecOuterMsgHeader) + var outerHeader = await stream.readPackedObject(SpecOuterMsgHeader) if outerHeader.compression != NoCompression: fail UnsupportedCompression - + if outerHeader.encoding != SszEncoding: fail UnsupportedEncoding if outerHeader.msgLen <= SpecInnerMsgHeader.sizeof.uint64: fail ProtocolViolation - var innerHeader = await stream.readFixedSizeStruct(SpecInnerMsgHeader) + var innerHeader = await stream.readPackedObject(SpecInnerMsgHeader) var msgContent = newSeq[byte](outerHeader.msgLen - SpecInnerMsgHeader.sizeof.uint64) await stream.transp.readExactly(addr msgContent[0], msgContent.len) @@ -257,7 +280,7 @@ proc registerRequest(peer: Peer, protocol: ProtocolInfo, timeout: Duration, responseFuture: FutureBase, - responseMsgId: int): int = + responseMethodId: uint16): int = inc peer.lastSentMsgId result = peer.lastSentMsgId @@ -265,11 +288,11 @@ proc registerRequest(peer: Peer, let req = OutstandingRequest(id: result, future: responseFuture, timeoutAt: timeoutAt) - peer.outstandingRequests[responseMsgId].addLast req + peer.outstandingRequests[responseMethodId.int].addLast req - let requestResolver = protocol.dispatcher.messages[responseMsgId].requestResolver + let requestResolver = protocol.dispatcher.messages[responseMethodId.int].requestResolver proc timeoutExpired(udata: pointer) = requestResolver(nil, responseFuture) - + addTimer(timeoutAt, timeoutExpired, nil) proc resolveResponseFuture(peer: Peer, protocol: ProtocolInfo, msgId: int, msg: pointer, reqId: int) = @@ -369,34 +392,48 @@ proc registerMsg(protocol: ProtocolInfo, template applyDecorator(p: NimNode, decorator: NimNode) = if decorator.kind != nnkNilLit: p.addPragma decorator -proc implementSendProcBody(msg: Message): NimNode = +proc prepareRequest(peer: Peer, + protocol: ProtocolInfo, + requestMethodId, responseMethodId: uint16, + stream: ByteStreamVar, + timeout: Duration, + responseFuture: FutureBase): DelayedWriteCursor = + + let reqId = registerRequest(peer, protocol, timeout, + responseFuture, responseMethodId) + + result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader) + + stream.appendPackedObject SpecInnerMsgHeader( + reqId: reqId, + methodId: requestMethodId) + +proc implementSendProcBody(sendProc: SendProc) = + let + msg = sendProc.msg + resultIdent = ident "result" + delayedWriteCursor = ident "delayedWriteCursor" + proc preludeGenerator(stream: NimNode): NimNode = result = newStmtList() - if mgs.kind == msgRequest: - var reqId = ident "reqId" - let reqToResponseOffset = responseMsgId - msgId - let responseMsgId = quote do: `perPeerMsgIdVar` + `reqToResponseOffset` - - # Each request is registered so we can resolve it when the response - # arrives. There are two types of protocols: LES-like protocols use - # explicit `reqId` sent over the wire, while the ETH wire protocol - # assumes there is one outstanding request at a time (if there are - # multiple requests we'll resolve them in FIFO order). - let registerRequestCall = newCall(registerRequest, msgRecipient, - msg.timeoutParam[0], - resultIdent, - responseMsgId) + if msg.kind == msgRequest: + let + reqId = ident "reqId" + appendPackedObject = bindSym "appendPackedObject" + requestMethodId = newLit(msg.id) + responseMethodId = newLit(msg.response.id) + peer = sendProc.peerParam + protocol = sendProc.msg.protocol.protocolInfoVar + timeout = sendProc.timeoutParam result.add quote do: - let `reqId` = `registerRequestCall` - #`stream`.write(uint64(`reqId`)) - #`stream`.write(uint16(`methodId`)) + let `delayedWriteCursor` = `prepareRequest`( + `peer`, `protocol`, `requestMethodId`, `responseMethodId`, `stream`, `timeout`, `resultIdent`) proc sendCallGenerator(peer, bytes: NimNode): NimNode = let linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture" sendMsg = bindSym "sendMsg" - resultIdent = ident "result" sendCall = newCall(sendMsg, peer, bytes) if msg.kind == msgRequest: @@ -411,13 +448,13 @@ proc implementSendProcBody(msg: Message): NimNode = # `sendMsg` call. quote: return `sendCall` - msg.createSendProcBody(preludeGenerator, sendCallGenerator) + sendProc.implementBody(preludeGenerator, sendCallGenerator) proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = let resultIdent = ident "result" Option = bindSym "Option" - + # XXX: Binding the int type causes instantiation failure for some reason # Int = bindSym "int" Int = ident "int" @@ -432,17 +469,17 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = messagePrinter = bindSym "messagePrinter" nextMsgResolver = bindSym "nextMsgResolver" - registerRequest = bindSym "registerRequest" requestResolver = bindSym "requestResolver" resolveResponseFuture = bindSym "resolveResponseFuture" nextMsg = bindSym "nextMsg" initProtocol = bindSym "initProtocol" registerMsg = bindSym "registerMsg" - + peer = ident "peer" reqId = ident "reqId" stream = ident "stream" protocol = ident "protocol" + response = ident "response" msgContents = ident "msgContents" receivedMsg = ident "receivedMsg" @@ -457,16 +494,16 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = result.PeerType = Peer result.NetworkType = EthereumNode result.SerializationFormat = Format + result.ResponseType = ResponseWithId - result.implementMsg = proc (p: P2PProtocol, msg: Message, resp: Message = nil) = + result.implementMsg = proc (protocol: P2PProtocol, msg: Message) = var - msgId = msg.id - msgIdLit = newLit(msgId) + msgIdLit = newLit(msg.id) msgRecName = msg.recIdent msgKind = msg.kind n = msg.procDef - responseMsgId = if resp != nil: resp.id else: -1 - responseRecord = if resp != nil: resp.recIdent else: nil + responseMethodId = if msg.response != nil: msg.response.id else: -1 + responseRecord = if msg.response != nil: msg.response.recIdent else: nil msgIdent = n.name msgName = $msgIdent hasReqIds = msgKind in {msgRequest, msgResponse} @@ -474,75 +511,43 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = # variables used in the sending procs msgRecipient = ident"msgRecipient" - sendTo = ident"sendTo" rlpWriter = ident"writer" paramsToWrite = newSeq[NimNode](0) perPeerMsgIdVar = ident"perPeerMsgId" - # nodes to store the user-supplied message handling proc if present - userHandlerCall: NimNode = nil - awaitUserHandler = newStmtList() - - case msgKind - of msgRequest: - discard - - of msgResponse: - if hasReqIds: - paramsToWrite.add newDotExpr(sendTo, ident"id") - - of msgHandshake, msgNotification: discard - + ## + ## Augment user handler + ## if msg.userHandler != nil: - var extraDefs: NimNode if msgKind == msgRequest: + msg.userHandler.params.insert(2, newIdentDefs(reqId, Int)) + let peerParam = msg.userHandler.params[1][0] - let response = ident"response" - if hasReqIds: - extraDefs = quote do: - let `response` = `ResponseWithId`[`responseRecord`](peer: `peerParam`, id: `reqId`) - else: - extraDefs = quote do: - let `response` = `Response`[`responseRecord`](`peerParam`) + let extraDefs = quote do: + let `response` = `ResponseWithId`[`responseRecord`](peer: `peerParam`, id: `reqId`) msg.userHandler.addPreludeDefs extraDefs - # This is the call to the user supplied handled. Here we add only the - # initial peer param, while the rest of the params will be added later. - userHandlerCall = newCall(msg.userHandler.name, peer) - - if hasReqIds: - msg.userHandler.params.insert(2, newIdentDefs(reqId, ident"int")) - userHandlerCall.add reqId - - # When there is a user handler, it must be awaited in the thunk proc. - # Above, by default `awaitUserHandler` is set to a no-op statement list. - awaitUserHandler = newCall("await", userHandlerCall) - - p.outRecvProcs.add(msg.userHandler) - - for param, paramType in n.typedParams(skip = 1): - # This is a fragment of the sending proc that - # serializes each of the passed parameters: - paramsToWrite.add param - - # If there is user message handler, we'll place a call to it by - # unpacking the fields of the received message: - if userHandlerCall != nil: - userHandlerCall.add newDotExpr(receivedMsg, param) + protocol.outRecvProcs.add(msg.userHandler) + ## + ## Implemenmt Thunk + ## let traceMsg = when tracingEnabled: newCall(bindSym"logReceivedMsg", peer, receivedMsg) else: newStmtList() - + # variables used in the receiving procs let callResolvedResponseFuture = if msgKind == msgResponse: newCall(resolveResponseFuture, peer, msgIdLit, newCall("addr", receivedMsg), reqId) else: newStmtList() - let thunkName = ident(msgName & "_thunk") + let + awaitUserHandler = msg.genAwaitUserHandler(receivedMsg, peer, reqId) + thunkName = ident(msgName & "_thunk") + var thunkProc = quote do: proc `thunkName`(`peer`: `Peer`, `stream`: `P2PStream`, @@ -553,67 +558,23 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = `awaitUserHandler` `callResolvedResponseFuture` - for p in userPragmas: thunkProc.addPragma p + protocol.outRecvProcs.add thunkProc - case msgKind - of msgRequest: thunkProc.applyDecorator p.incomingRequestThunkDecorator - of msgResponse: thunkProc.applyDecorator p.incomingResponseThunkDecorator - else: discard - - p.outRecvProcs.add thunkProc + ## + ## Implement Senders and Handshake + ## + var sendProc = createSendProc(msg, isRawSender = (msg.kind == msgHandshake)) + protocol.outSendProcs.add sendProc.allDefs - var msgSendProc = n - let msgSendProcName = n.name - p.outSendProcs.add msgSendProc - - # TODO: check that the first param has the correct type - msgSendProc.params[1][0] = sendTo - msgSendProc.addPragma ident"gcsafe" - - case msgKind - of msgRequest: - # Add a timeout parameter for all request procs - msgSendProc.params.add msg.timeoutParam - of msgResponse: - # A response proc must be called with a response object that originates - # from a certain request. Here we change the Peer parameter at position - # 1 to the correct strongly-typed ResponseType. The incoming procs still - # gets the normal Peer paramter. - # let rsp = bindSym "Response" - # let rspId = bindSym "ResponseWithId" - let - ResponseType = newTree(nnkBracketExpr, ResponseWithId, msgRecName) - - msgSendProc.params[1][1] = ResponseType - - p.outSendProcs.add quote do: - template send*(r: `ResponseType`, args: varargs[untyped]): auto = - `msgSendProcName`(r, args) - else: discard - - # We change the return type of the sending proc to a Future. - # If this is a request proc, the future will return the response record. - let rt = if msgKind != msgRequest: ident"void" - else: newTree(nnkBracketExpr, Option, responseRecord) - msgSendProc.params[0] = newTree(nnkBracketExpr, ident("Future"), rt) - - let msgBytes = ident"msgBytes" - - let finalizeRequest = quote do: - let `msgBytes` = `finish`(`rlpWriter`) - - if msgKind == msgHandshake: + if msg.kind == msgHandshake: var rawSendProc = genSym(nskProc, msgName & "RawSend") - handshakeExchanger = newProc(name = msg.identWithExportMarker, - procType = nnkTemplateDef) + handshakeExchanger = createSendProc(msg, procType = nnkTemplateDef) - handshakeExchanger.params = msgSendProc.params.copyNimTree - handshakeExchanger.params.add msg.timeoutParam - handshakeExchanger.params[0] = newTree(nnkBracketExpr, ident("Future"), msgRecName) + handshakeExchanger.def.params[0] = newTree(nnkBracketExpr, ident("Future"), msgRecName) var - forwardCall = newCall(rawSendProc).appendAllParams(handshakeExchanger) + forwardCall = newCall(rawSendProc).appendAllParams(handshakeExchanger.def) peerValue = forwardCall[1] timeoutValue = msg.timeoutParam[0] handshakeImpl = ident"handshakeImpl" @@ -621,7 +582,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = forwardCall[1] = peer forwardCall.del(forwardCall.len - 1) - handshakeExchanger.body = quote do: + handshakeExchanger.def.body = quote do: let `peer` = `peerValue` let sendingFuture = `forwardCall` `handshakeImpl`(`peer`, @@ -629,36 +590,24 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = `nextMsg`(`peer`, `msgRecName`), `timeoutValue`) - msgSendProc.name = rawSendProc - p.outSendProcs.add handshakeExchanger + sendProc.def.name = rawSendProc + protocol.outSendProcs.add handshakeExchanger.def else: - # Make the send proc public - msgSendProc.name = msg.identWithExportMarker + implementSendProcBody sendProc - let initWriter = quote do: - var `rlpWriter` = `initRlpWriter`() - const `perProtocolMsgId` = `msgId` - let `perPeerMsgIdVar` = `msgIdLit` - `append`(`rlpWriter`, `perPeerMsgIdVar`) - - msgSendProc.body = implementSendProcBody(msg) - - if msgKind == msgRequest: - msgSendProc.applyDecorator p.outgoingRequestDecorator - - p.outProcRegistrations.add( + protocol.outProcRegistrations.add( newCall(registerMsg, - p.protocolInfoVar, - newIntLitNode(msgId), - newStrLitNode($n.name), + protocol.protocolInfoVar, + msgIdLit, + newLit(msgName), thunkName, newTree(nnkBracketExpr, messagePrinter, msgRecName), newTree(nnkBracketExpr, requestResolver, msgRecName), newTree(nnkBracketExpr, nextMsgResolver, msgRecName))) - result.implementProtocolInit = proc (p: P2PProtocol): NimNode = + result.implementProtocolInit = proc (protocol: P2PProtocol): NimNode = return newCall(initProtocol, - newLit(p.shortName), - newLit(p.version), - p.peerInit, p.netInit) + newLit(protocol.shortName), + newLit(protocol.version), + protocol.peerInit, protocol.netInit)