From 84afb77b27e4c70962e323f8cee8ea46e9957448 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Fri, 21 Jun 2019 19:32:52 +0300 Subject: [PATCH] Restore compilation with libp2p_native after the latest changes in the spec back-end --- beacon_chain/libp2p_backend.nim | 72 ++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 18 deletions(-) diff --git a/beacon_chain/libp2p_backend.nim b/beacon_chain/libp2p_backend.nim index c3d949e7f..6b8665418 100644 --- a/beacon_chain/libp2p_backend.nim +++ b/beacon_chain/libp2p_backend.nim @@ -70,7 +70,7 @@ type PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.} NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.} - HandshakeStep* = proc(peer: Peer, handshakeStream: P2PStream): Future[void] {.gcsafe.} + HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.} DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.} ThunkProc* = proc(daemon: DaemonAPI, stream: P2PStream): Future[void] {.gcsafe.} MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.} @@ -96,8 +96,7 @@ proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} = new result result.daemon = daemon result.daemon.userData = result - result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config - init result.peers + result.peers = initTable[PeerID, Peer]() newSeq result.protocolStates, allProtocols.len for proto in allProtocols: @@ -151,7 +150,7 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, # Send the request let stream = streamFut.read let sent = await stream.transp.write(requestBytes) - if sent != requestBytes: + if sent != requestBytes.len: await disconnectAndRaise(peer, FaultOrError, "Incomplete send") # Read the response @@ -222,6 +221,7 @@ proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer = result.network = network result.awaitedMessages = initTable[CompressedMsgId, FutureBase]() result.connectionState = Connected + result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config newSeq result.protocolStates, allProtocols.len for i in 0 ..< allProtocols.len: let proto = allProtocols[i] @@ -236,6 +236,9 @@ proc performProtocolHandshakes*(peer: Peer) {.async.} = await all(subProtocolsHandshakes) +template initializeConnection*(peer: Peer): auto = + performProtocolHandshakes(peer) + template getRecipient(peer: Peer): Peer = peer @@ -357,7 +360,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = result.ResponderType = Responder result.afterProtocolInit = proc (p: P2PProtocol) = - p.onPeerConnected.params.add newIdentDefs(ident"handshakeStream", P2PStream) + p.onPeerConnected.params.add newIdentDefs(streamVar, P2PStream) result.implementMsg = proc (msg: Message) = let @@ -371,7 +374,11 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream)) msg.initResponderCall.add streamVar - let awaitUserHandler = msg.genAwaitUserHandler(newCall("get", receivedMsg), [peerVar, streamVar]) + ## + ## Implemenmt Thunk + ## + let awaitUserHandler = msg.genAwaitUserHandler( + newCall("get", receivedMsg), [peerVar, streamVar]) let tracing = when tracingEnabled: quote do: logReceivedMsg(`streamVar`.peer, `receivedMsg`.get) @@ -382,17 +389,46 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout)) thunkName = ident(msgName & "_thunk") - msg.defineThunk quote do: - proc `thunkName`(`daemonVar`: `DaemonAPI`, `streamVar`: `P2PStream`) {.async, gcsafe.} = - var `deadlineVar` = sleepAsync `requestDataTimeout` - var `receivedMsg` = `await` readMsg(`streamVar`, `msgRecName`, `deadlineVar`) - if `receivedMsg`.isNone: - # TODO: This peer is misbehaving, perhaps we should penalize him somehow - return - let `peerVar` = `peerFromStream`(`daemonVar`, `streamVar`) - `tracing` - `awaitUserHandler` - `resolveNextMsgFutures`(`peerVar`, get(`receivedMsg`)) + msg.defineThunk if msg.kind == msgHandshake: + # In LibP2P protocols, the `onPeerConnected` handler is executed when the + # other peer opens a stream. Contrary to other thunk procs, the message is + # not immediately deserialized. Instead, the handshake "sender proc" acts + # as an exchanger that sends our handshake message while deserializing the + # contents of the other peer's handshake. + # Thus, the very first communication act of the `onPeerConnected` handler + # must be the execution of the handshake exchanger. + let handshake = msg.protocol.onPeerConnected + if handshake.isNil: + macros.error "A LibP2P protocol with a handshake must also include an " & + "`onPeerConnected` handler.", msg.procDef + + # We must generate a forward declaration for the `onPeerConnected` handler, + # so we can call it from the thunk proc: + let handshakeProcName = handshake.name + msg.protocol.outRecvProcs.add quote do: + proc `handshakeProcName`(`peerVar`: `Peer`, + `streamVar`: `P2PStream`) {.async, gcsafe.} + + quote: + proc `thunkName`(`daemonVar`: `DaemonAPI`, + `streamVar`: `P2PStream`): Future[void] {.gcsafe.} = + let `peerVar` = `peerFromStream`(`daemonVar`, `streamVar`) + return `handshakeProcName`(`peerVar`, `streamVar`) + else: + quote: + proc `thunkName`(`daemonVar`: `DaemonAPI`, + `streamVar`: `P2PStream`) {.async, gcsafe.} = + var `deadlineVar` = sleepAsync `requestDataTimeout` + var `receivedMsg` = `await` readMsg(`streamVar`, + `msgRecName`, + `deadlineVar`) + if `receivedMsg`.isNone: + # TODO: This peer is misbehaving, perhaps we should penalize him somehow + return + let `peerVar` = `peerFromStream`(`daemonVar`, `streamVar`) + `tracing` + `awaitUserHandler` + `resolveNextMsgFutures`(`peerVar`, get(`receivedMsg`)) ## ## Implement Senders and Handshake @@ -410,7 +446,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = handshakeExchanger.setBody quote do: let - stream = ident"handshakeStream" + stream = ident "stream" rawSendProc = `bindSymOp` `rawSendProc` params = `paramsArray` lazySendCall = newCall(rawSendProc, params)