From 877b22cfb8487fb4b8d80d586f917b8726aea5c6 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Wed, 5 Jun 2019 05:00:07 +0300 Subject: [PATCH] Share more code between the libp2p backends --- beacon_chain/eth2_network.nim | 9 +- beacon_chain/libp2p_backend.nim | 303 +++++++----------------- beacon_chain/libp2p_backends_common.nim | 42 ++++ beacon_chain/libp2p_spec_backend.nim | 118 +++------ beacon_chain/sync_protocol.nim | 2 +- 5 files changed, 169 insertions(+), 305 deletions(-) create mode 100644 beacon_chain/libp2p_backends_common.nim diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 4e96ca052..d6eba1eb1 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -125,17 +125,12 @@ else: when networkBackend == libp2pSpecBackend: import libp2p_spec_backend export libp2p_spec_backend - - const - BreachOfProtocol* = FaultOrError - netBackendName* = "libp2p_spec" + const netBackendName* = "libp2p_spec" else: import libp2p_backend export libp2p_backend - - const - netBackendName* = "libp2p_native" + const netBackendName* = "libp2p_native" type BootstrapAddr* = PeerInfo diff --git a/beacon_chain/libp2p_backend.nim b/beacon_chain/libp2p_backend.nim index f8496e10b..677ca3937 100644 --- a/beacon_chain/libp2p_backend.nim +++ b/beacon_chain/libp2p_backend.nim @@ -14,6 +14,8 @@ type peers*: Table[PeerID, Peer] protocolStates*: seq[RootRef] + EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers + Peer* = ref object network*: Eth2Node id*: PeerID @@ -21,7 +23,31 @@ type awaitedMessages: Table[CompressedMsgId, FutureBase] protocolStates*: seq[RootRef] - EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers + ConnectionState* = enum + None, + Connecting, + Connected, + Disconnecting, + Disconnected + + DisconnectionReason* = enum + UselessPeer + BreachOfProtocol + + UntypedResponder = object + peer*: Peer + stream*: P2PStream + + Responder*[MsgType] = distinct UntypedResponder + + MessageInfo* = object + name*: string + + # Private fields: + thunk*: ThunkProc + libp2pProtocol: string + printer*: MessageContentPrinter + nextMsgResolver*: NextMsgResolver ProtocolInfoObj* = object name*: string @@ -37,17 +63,8 @@ type ProtocolInfo* = ptr ProtocolInfoObj - MessageInfo* = object - name*: string - - # Private fields: - thunk*: ThunkProc - libp2pProtocol: string - printer*: MessageContentPrinter - nextMsgResolver*: NextMsgResolver - CompressedMsgId = tuple - protocolIndex, msgId: int + protocolIdx, methodId: int PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.} NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.} @@ -57,25 +74,8 @@ type MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.} NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.} - ConnectionState* = enum - None, - Connecting, - Connected, - Disconnecting, - Disconnected - - UntypedResponder = object - peer*: Peer - stream*: P2PStream - - Responder*[MsgType] = distinct UntypedResponder - Bytes = seq[byte] - DisconnectionReason* = enum - UselessPeer - BreachOfProtocol - PeerDisconnected* = object of CatchableError reason*: DisconnectionReason @@ -84,34 +84,11 @@ const defaultOutgoingReqTimeout = 10000 HandshakeTimeout = BreachOfProtocol -var - gProtocols: seq[ProtocolInfo] + IrrelevantNetwork* = UselessPeer -# The variables above are immutable RTTI information. We need to tell -# Nim to not consider them GcSafe violations: -template allProtocols: auto = {.gcsafe.}: gProtocols - -proc `$`*(peer: Peer): string = $peer.id - -proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = - # TODO: How should we notify the other peer? - if peer.connectionState notin {Disconnecting, Disconnected}: - peer.connectionState = Disconnecting - await peer.network.daemon.disconnect(peer.id) - peer.connectionState = Disconnected - peer.network.peers.del(peer.id) - -template raisePeerDisconnected(msg: string, r: DisconnectionReason) = - var e = newException(PeerDisconnected, msg) - e.reason = r - raise e - -proc disconnectAndRaise(peer: Peer, - reason: DisconnectionReason, - msg: string) {.async.} = - let r = reason - await peer.disconnect(reason) - raisePeerDisconnected(msg, reason) +include libp2p_backends_common +include eth/p2p/p2p_backends_helpers +include eth/p2p/p2p_tracing proc init*(node: Eth2Node) {.async.} = node.daemon = await newDaemonApi({PSGossipSub}) @@ -127,9 +104,6 @@ proc init*(node: Eth2Node) {.async.} = if msg.libp2pProtocol.len > 0: await node.daemon.addHandler(@[msg.libp2pProtocol], msg.thunk) -include eth/p2p/p2p_backends_helpers -include eth/p2p/p2p_tracing - proc readMsg(stream: P2PStream, MsgType: type, timeout = 10.seconds): Future[Option[MsgType]] {.async.} = var timeout = sleepAsync timeout @@ -212,24 +186,6 @@ template handshakeImpl(HandshakeTypeExpr: untyped, asyncStep(stream) -proc getCompressedMsgId(MsgType: type): CompressedMsgId = - mixin msgProtocol, protocolInfo, msgId - (protocolIndex: MsgType.msgProtocol.protocolInfo.index, msgId: MsgType.msgId) - -proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = - ## This procs awaits a specific P2P message. - ## Any messages received while waiting will be dispatched to their - ## respective handlers. The designated message handler will also run - ## to completion before the future returned by `nextMsg` is resolved. - mixin msgProtocol, protocolInfo, msgId - let awaitedMsgId = getCompressedMsgId(MsgType) - let f = getOrDefault(peer.awaitedMessages, awaitedMsgId) - if not f.isNil: - return Future[MsgType](f) - - newFuture result - peer.awaitedMessages[awaitedMsgId] = result - proc resolveNextMsgFutures(peer: Peer, msg: auto) = type MsgType = type(msg) let msgId = getCompressedMsgId(MsgType) @@ -284,6 +240,13 @@ proc initProtocol(name: string, result.peerStateInitializer = peerInit result.networkStateInitializer = networkInit +proc registerProtocol(protocol: ProtocolInfo) = + # TODO: This can be done at compile-time in the future + let pos = lowerBound(gProtocols, protocol) + gProtocols.insert(protocol, pos) + for i in 0 ..< gProtocols.len: + gProtocols[i].index = i + proc setEventHandlers(p: ProtocolInfo, handshake: HandshakeStep, disconnectHandler: DisconnectionHandler) = @@ -300,13 +263,6 @@ proc registerMsg(protocol: ProtocolInfo, libp2pProtocol: libp2pProtocol, printer: printer) -proc registerProtocol(protocol: ProtocolInfo) = - # TODO: This can be done at compile-time in the future - let pos = lowerBound(gProtocols, protocol) - gProtocols.insert(protocol, pos) - for i in 0 ..< gProtocols.len: - gProtocols[i].index = i - proc getRequestProtoName(fn: NimNode): NimNode = when true: return newLit("rpc/" & $fn.name) @@ -325,42 +281,52 @@ proc init*[MsgType](T: type Responder[MsgType], peer: Peer, stream: P2PStream): T = T(UntypedResponder(peer: peer, stream: stream)) +proc implementSendProcBody(sendProc: SendProc) = + let + msg = sendProc.msg + peer = sendProc.peerParam + timeout = sendProc.timeoutParam + ResponseRecord = if msg.response != nil: msg.response.recIdent else: nil + UntypedResponder = bindSym "UntypedResponder" + sendMsg = bindSym "sendMsg" + sendBytes = bindSym "sendBytes" + makeEth2Request = bindSym "makeEth2Request" + + proc sendCallGenerator(peer, bytes: NimNode): NimNode = + if msg.kind != msgResponse: + let msgProto = getRequestProtoName(msg.procDef) + case msg.kind + of msgRequest: + let timeout = msg.timeoutParam[0] + quote: `makeEth2Request`(`peer`, `msgProto`, `bytes`, + `ResponseRecord`, `timeout`) + of msgHandshake: + quote: `sendBytes`(`peer`, `bytes`) + else: + quote: `sendMsg`(`peer`, `msgProto`, `bytes`) + else: + quote: `sendBytes`(`UntypedResponder`(`peer`).stream, `bytes`) + + sendProc.useStandardBody(nil, sendCallGenerator) + proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = var - name_openStream = newTree(nnkPostfix, ident("*"), ident"openStream") - outputStream = ident "outputStream" Format = ident "SSZ" - Option = bindSym "Option" - UntypedResponder = bindSym "UntypedResponder" Responder = bindSym "Responder" DaemonAPI = bindSym "DaemonAPI" P2PStream = ident "P2PStream" - # XXX: Binding the int type causes instantiation failure for some reason - # Int = bindSym "int" - Int = ident "int" - Void = ident "void" Peer = bindSym "Peer" Eth2Node = bindSym "Eth2Node" - writeField = bindSym "writeField" - getOutput = bindSym "getOutput" messagePrinter = bindSym "messagePrinter" - getRecipient = bindSym "getRecipient" peerFromStream = bindSym "peerFromStream" - makeEth2Request = bindSym "makeEth2Request" handshakeImpl = bindSym "handshakeImpl" - sendMsg = bindSym "sendMsg" - sendBytes = bindSym "sendBytes" resolveNextMsgFutures = bindSym "resolveNextMsgFutures" milliseconds = bindSym "milliseconds" registerMsg = bindSym "registerMsg" initProtocol = bindSym "initProtocol" bindSymOp = bindSym "bindSym" - msgRecipient = ident "msgRecipient" - sendTo = ident "sendTo" - writer = ident "writer" - recordStartMemo = ident"recordStartMemo" receivedMsg = ident "msg" - daemon = ident "daemon" + daemonVar = ident "daemon" streamVar = ident "stream" await = ident "await" @@ -380,94 +346,53 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = result.implementMsg = proc (msg: Message) = let - n = msg.procDef protocol = msg.protocol - msgId = newLit(msg.id) - msgIdent = n.name - msgName = $msgIdent - msgKind = msg.kind + msgName = $msg.ident msgRecName = msg.recIdent - ResponseRecord = if msg.response != nil: msg.response.recIdent else: nil - userPragmas = n.pragma - if n.body.kind != nnkEmpty and msg.kind == msgRequest: + if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest: # Request procs need an extra param - the stream where the response # should be written: - msg.userHandler.params.insert(1, newIdentDefs(streamVar, P2PStream)) + msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream)) msg.initResponderCall.add streamVar - let awaitUserHandler = msg.genAwaitUserHandler(receivedMsg, [streamVar, peerVar]) + let awaitUserHandler = msg.genAwaitUserHandler(newCall("get", receivedMsg), [peerVar, streamVar]) let tracing = when tracingEnabled: quote do: logReceivedMsg(`streamVar`.peer, `receivedMsg`.get) else: newStmtList() - let requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout)) - let thunkName = ident(msgName & "_thunk") - var thunkProc = quote do: - proc `thunkName`(`daemon`: `DaemonAPI`, `streamVar`: `P2PStream`) {.async, gcsafe.} = + let + requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout)) + thunkName = ident(msgName & "_thunk") + + msg.defineThunk quote do: + proc `thunkName`(`daemonVar`: `DaemonAPI`, `streamVar`: `P2PStream`) {.async, gcsafe.} = var `receivedMsg` = `await` readMsg(`streamVar`, `msgRecName`, `requestDataTimeout`) if `receivedMsg`.isNone: # TODO: This peer is misbehaving, perhaps we should penalize him somehow return - let `peerVar` = `peerFromStream`(`daemon`, `streamVar`) + let `peerVar` = `peerFromStream`(`daemonVar`, `streamVar`) `tracing` `awaitUserHandler` `resolveNextMsgFutures`(`peerVar`, get(`receivedMsg`)) - protocol.outRecvProcs.add thunkProc + ## + ## Implement Senders and Handshake + ## + var sendProc = msg.createSendProc(isRawSender = (msg.kind == msgHandshake)) + implementSendProcBody sendProc - var - # variables used in the sending procs - appendParams = newNimNode(nnkStmtList) - paramsToWrite = newSeq[NimNode](0) - - for param, paramType in n.typedParams(skip = 1): - paramsToWrite.add param - - var msgSendProc = n - let msgSendProcName = n.name - protocol.outSendProcs.add msgSendProc - - # TODO: check that the first param has the correct type - msgSendProc.params[1][0] = sendTo - msgSendProc.addPragma ident"gcsafe" - - # Add a timeout parameter for all request procs - case msgKind - of msgRequest: - # Add a timeout parameter for all request procs - msgSendProc.params.add msg.timeoutParam - of msgResponse: - # A response proc must be called with a response object that originates - # from a certain request. Here we change the Peer parameter at position - # 1 to the correct strongly-typed ResponseType. The incoming procs still - # gets the normal Peer paramter. - let ResponseType = newTree(nnkBracketExpr, Responder, msgRecName) - msgSendProc.params[1][1] = ResponseType - protocol.outSendProcs.add quote do: - template send*(r: `ResponseType`, args: varargs[untyped]): auto = - `msgSendProcName`(r, args) - else: discard - - # We change the return type of the sending proc to a Future. - # If this is a request proc, the future will return the response record. - let rt = if msgKind != msgRequest: Void - else: newTree(nnkBracketExpr, Option, ResponseRecord) - msgSendProc.params[0] = newTree(nnkBracketExpr, ident("Future"), rt) - - if msgKind == msgHandshake: + if msg.kind == msgHandshake: var - rawSendProc = msgName & "RawSend" - handshakeTypeName = $msgRecName + rawSendProc = newLit($sendProc.def.name) handshakeExchanger = msg.createSendProc(nnkMacroDef) paramsArray = newTree(nnkBracket).appendAllParams(handshakeExchanger.def) bindSym = ident "bindSym" getAst = ident "getAst" - # TODO: macros.body triggers an assertion error when the proc type is nnkMacroDef - handshakeExchanger.def[6] = quote do: + handshakeExchanger.setBody quote do: let stream = ident"handshakeStream" rawSendProc = `bindSymOp` `rawSendProc` @@ -481,64 +406,14 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = return `getAst`(`handshakeImpl`(`msgRecName`, peer, stream, lazySendCall, timeout)) - protocol.outSendProcs.add handshakeExchanger.def - - msgSendProc.params[1][1] = P2PStream - msgSendProc.name = ident rawSendProc - else: - # Make the send proc public - msgSendProc.name = msg.identWithExportMarker - - let initWriter = quote do: - var `outputStream` = init OutputStream - var `writer` = init(WriterType(`Format`), `outputStream`) - var `recordStartMemo` = beginRecord(`writer`, `msgRecName`) - - for param in paramsToWrite: - appendParams.add newCall(writeField, writer, newLit($param), param) - - when tracingEnabled: - appendParams.add logSentMsgFields(msgRecipient, protocol, msgName, paramsToWrite) - - let msgBytes = ident"msgBytes" - let finalizeRequest = quote do: - endRecord(`writer`, `recordStartMemo`) - let `msgBytes` = `getOutput`(`outputStream`) - - var msgProto = newLit("") - let sendCall = - if msgKind != msgResponse: - msgProto = getRequestProtoName(n) - - when false: - var openStreamProc = n.copyNimTree - var openStreamProc.name = name_openStream - openStreamProc.params.insert 1, newIdentDefs(ident"T", msgRecName) - - if msgKind == msgRequest: - let timeout = msg.timeoutParam[0] - quote: `makeEth2Request`(`msgRecipient`, `msgProto`, `msgBytes`, - `ResponseRecord`, `timeout`) - elif msgId.intVal == 0: - quote: `sendBytes`(`sendTo`, `msgBytes`) - else: - quote: `sendMsg`(`msgRecipient`, `msgProto`, `msgBytes`) - else: - quote: `sendBytes`(`UntypedResponder`(`sendTo`).stream, `msgBytes`) - - msgSendProc.body = quote do: - let `msgRecipient` = `getRecipient`(`sendTo`) - `initWriter` - `appendParams` - `finalizeRequest` - return `sendCall` + sendProc.def.params[1][1] = P2PStream protocol.outProcRegistrations.add( newCall(registerMsg, protocol.protocolInfoVar, newLit(msgName), thunkName, - msgProto, + getRequestProtoName(msg.procDef), newTree(nnkBracketExpr, messagePrinter, msgRecName))) result.implementProtocolInit = proc (p: P2PProtocol): NimNode = diff --git a/beacon_chain/libp2p_backends_common.nim b/beacon_chain/libp2p_backends_common.nim new file mode 100644 index 000000000..24a4e6919 --- /dev/null +++ b/beacon_chain/libp2p_backends_common.nim @@ -0,0 +1,42 @@ +# included from libp2p_backend + +proc `$`*(peer: Peer): string = $peer.id + +proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = + # TODO: How should we notify the other peer? + if peer.connectionState notin {Disconnecting, Disconnected}: + peer.connectionState = Disconnecting + await peer.network.daemon.disconnect(peer.id) + peer.connectionState = Disconnected + peer.network.peers.del(peer.id) + +template raisePeerDisconnected(msg: string, r: DisconnectionReason) = + var e = newException(PeerDisconnected, msg) + e.reason = r + raise e + +proc disconnectAndRaise(peer: Peer, + reason: DisconnectionReason, + msg: string) {.async.} = + let r = reason + await peer.disconnect(r) + raisePeerDisconnected(msg, r) + +proc getCompressedMsgId*(MsgType: type): CompressedMsgId = + mixin msgId, msgProtocol, protocolInfo + (protocolIdx: MsgType.msgProtocol.protocolInfo.index, + methodId: MsgType.msgId) + +proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = + ## This procs awaits a specific P2P message. + ## Any messages received while waiting will be dispatched to their + ## respective handlers. The designated message handler will also run + ## to completion before the future returned by `nextMsg` is resolved. + let awaitedMsgId = getCompressedMsgId(MsgType) + let f = getOrDefault(peer.awaitedMessages, awaitedMsgId) + if not f.isNil: + return Future[MsgType](f) + + initFuture result + peer.awaitedMessages[awaitedMsgId] = result + diff --git a/beacon_chain/libp2p_spec_backend.nim b/beacon_chain/libp2p_spec_backend.nim index c53a39699..3d57db53b 100644 --- a/beacon_chain/libp2p_spec_backend.nim +++ b/beacon_chain/libp2p_spec_backend.nim @@ -24,6 +24,17 @@ type EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers + Peer* = ref object + network*: Eth2Node + id*: PeerID + lastSentMsgId*: uint64 + rpcStream*: P2PStream + connectionState*: ConnectionState + awaitedMessages: Table[CompressedMsgId, FutureBase] + outstandingRequests*: seq[Deque[OutstandingRequest]] + protocolStates*: seq[RootRef] + maxInactivityAllowed: Duration + ConnectionState* = enum None, Connecting, @@ -63,24 +74,12 @@ type stream*: P2PStream protocolInfo*: ProtocolInfo - Peer* = ref object - network*: Eth2Node - id*: PeerID - lastSentMsgId*: uint64 - rpcStream*: P2PStream - connectionState*: ConnectionState - protocolStates*: seq[RootRef] - maxInactivityAllowed: Duration - awaitedMessages: Table[CompressedMsgId, FutureBase] - outstandingRequests*: seq[Deque[OutstandingRequest]] - MessageInfo* = object id*: int name*: string # Private fields: thunk*: ThunkProc - libp2pProtocol: string printer*: MessageContentPrinter nextMsgResolver*: NextMsgResolver requestResolver*: RequestResolver @@ -135,25 +134,6 @@ type PeerDisconnected* = object of P2PBackendError reason*: DisconnectionReason -var gProtocols: seq[ProtocolInfo] - -template allProtocols: auto = {.gcsafe.}: gProtocols - -const - HandshakeTimeout = FaultOrError - # TODO: this doesn't seem right. - # We should lobby for more disconnection reasons. - -proc `$`*(peer: Peer): string = $peer.id - -proc readPackedObject(stream: P2PStream, T: type): Future[T] {.async.} = - await stream.transp.readExactly(addr result, sizeof result) - -proc appendPackedObject(stream: OutputStreamVar, value: auto) = - let valueAsBytes = cast[ptr byte](unsafeAddr(value)) - stream.append makeOpenArray(valueAsBytes, sizeof(value)) - -type PeerLoopExitReason = enum Success UnsupportedCompression @@ -162,18 +142,21 @@ type InactivePeer InternalError -proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = - # TODO: How should we notify the other peer? - if peer.connectionState notin {Disconnecting, Disconnected}: - peer.connectionState = Disconnecting - await peer.network.daemon.disconnect(peer.id) - peer.connectionState = Disconnected - peer.network.peers.del(peer.id) +const + HandshakeTimeout = FaultOrError + BreachOfProtocol* = FaultOrError + # TODO: We should lobby for more disconnection reasons. -template raisePeerDisconnected(msg: string, r: DisconnectionReason) = - var e = newException(PeerDisconnected, msg) - e.reason = r - raise e +proc readPackedObject(stream: P2PStream, T: type): Future[T] {.async.} = + await stream.transp.readExactly(addr result, sizeof result) + +proc appendPackedObject(stream: OutputStreamVar, value: auto) = + let valueAsBytes = cast[ptr byte](unsafeAddr(value)) + stream.append makeOpenArray(valueAsBytes, sizeof(value)) + +include libp2p_backends_common +include eth/p2p/p2p_backends_helpers +include eth/p2p/p2p_tracing proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} @@ -189,11 +172,6 @@ proc init*(node: Eth2Node) {.async.} = await node.daemon.addHandler(@[beaconChainProtocol], handleConnectingBeaconChainPeer) -proc getCompressedMsgId*(MsgType: type): CompressedMsgId = - mixin msgId, msgProtocol, protocolInfo - (protocolIdx: MsgType.msgProtocol.protocolInfo.index, - methodId: MsgType.msgId) - proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer = new result result.id = id @@ -230,16 +208,6 @@ proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} = proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} = let peer = daemon.peerFromStream(stream) -proc disconnectAndRaise(peer: Peer, - reason: DisconnectionReason, - msg: string) {.async.} = - let r = reason - await peer.disconnect(r) - raisePeerDisconnected(msg, r) - -include eth/p2p/p2p_backends_helpers -include eth/p2p/p2p_tracing - proc accepts(d: Dispatcher, methodId: uint16): bool = methodId.int < d.messages.len @@ -309,19 +277,6 @@ proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} = proc sendMsg*[T](responder: ResponderWithId[T], data: Bytes): Future[void] = return sendMsg(responder.peer, data) -proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = - ## This procs awaits a specific P2P message. - ## Any messages received while waiting will be dispatched to their - ## respective handlers. The designated message handler will also run - ## to completion before the future returned by `nextMsg` is resolved. - let wantedId = MsgType.getCompressedMsgId - let f = peer.awaitedMessages[wantedId] - if not f.isNil: - return Future[MsgType](f) - - initFuture result - peer.awaitedMessages[wantedId] = result - proc dispatchMessages*(peer: Peer, protocol: ProtocolInfo, stream: P2PStream): Future[PeerLoopExitReason] {.async.} = while true: @@ -428,12 +383,6 @@ proc initProtocol(name: string, version: int, result.peerStateInitializer = peerInit result.networkStateInitializer = networkInit -proc setEventHandlers(p: ProtocolInfo, - handshake: HandshakeStep, - disconnectHandler: DisconnectionHandler) = - p.handshake = handshake - p.disconnectHandler = disconnectHandler - proc registerProtocol(protocol: ProtocolInfo) = # TODO: This can be done at compile-time in the future let pos = lowerBound(gProtocols, protocol) @@ -441,6 +390,12 @@ proc registerProtocol(protocol: ProtocolInfo) = for i in 0 ..< gProtocols.len: gProtocols[i].index = i +proc setEventHandlers(p: ProtocolInfo, + handshake: HandshakeStep, + disconnectHandler: DisconnectionHandler) = + p.handshake = handshake + p.disconnectHandler = disconnectHandler + proc registerMsg(protocol: ProtocolInfo, id: int, name: string, thunk: ThunkProc, @@ -523,7 +478,7 @@ proc implementSendProcBody(sendProc: SendProc) = # `sendMsg` call. quote: return `sendCall` - sendProc.implementBody(preludeGenerator, sendCallGenerator) + sendProc.useStandardBody(preludeGenerator, sendCallGenerator) proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = let @@ -566,6 +521,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = result.SerializationFormat = Format p.useRequestIds = true + result.ReqIdType = ident "uint64" result.ResponderType = ResponderWithId result.afterProtocolInit = proc (p: P2PProtocol) = @@ -602,7 +558,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = thunkName = ident(msgName & "_thunk") awaitUserHandler = msg.genAwaitUserHandler(receivedMsg, userHandlerParams) - var thunkProc = quote do: + msg.defineThunk quote do: proc `thunkName`(`peerVar`: `Peer`, `stream`: `P2PStream`, `reqIdVar`: uint64, @@ -612,18 +568,14 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = `awaitUserHandler` `callResolvePendingFutures` - protocol.outRecvProcs.add thunkProc - ## ## Implement Senders and Handshake ## var sendProc = msg.createSendProc(isRawSender = (msg.kind == msgHandshake)) implementSendProcBody sendProc - protocol.outSendProcs.add sendProc.allDefs if msg.kind == msgHandshake: - protocol.outSendProcs.add msg.genHandshakeTemplate(sendProc.def.name, - handshakeImpl, nextMsg) + discard msg.createHandshakeTemplate(sendProc.def.name, handshakeImpl, nextMsg) protocol.outProcRegistrations.add( newCall(registerMsg, diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 268496968..9ab26f2fb 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -74,7 +74,7 @@ p2pProtocol BeaconSync(version = 1, shortName = "bcs", networkState = BeaconSyncState): - onPeerConnected do(peer: Peer): + onPeerConnected do (peer: Peer): let protocolVersion = 1 # TODO: Spec doesn't specify this yet node = peer.networkState.node