diff --git a/eth/p2p/private/p2p_types.nim b/eth/p2p/private/p2p_types.nim index 0fcda1a..c54f856 100644 --- a/eth/p2p/private/p2p_types.nim +++ b/eth/p2p/private/p2p_types.nim @@ -38,6 +38,7 @@ type protocolStates*: seq[RootRef] outstandingRequests*: seq[Deque[OutstandingRequest]] awaitedMessages*: seq[FutureBase] + messageLoop*: Future[void] when useSnappy: snappyEnabled*: bool diff --git a/eth/p2p/rlpx.nim b/eth/p2p/rlpx.nim index 0acf941..59e8439 100644 --- a/eth/p2p/rlpx.nim +++ b/eth/p2p/rlpx.nim @@ -255,6 +255,16 @@ proc supports*(peer: Peer, Protocol: type): bool {.inline.} = template perPeerMsgId(peer: Peer, MsgType: type): int = perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId) +proc join*(peer: Peer): Future[void] = + ## Wait until `peer` is alive. + var retFuture = newFuture[void]("rlpx.peer.join") + proc continuation(udata: pointer) = retFuture.complete() + if not peer.messageLoop.finished: + peer.messageLoop.addCallback(continuation) + else: + retFuture.complete() + return retFuture + proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer, rlpOut: var RlpWriter) = let baseMsgId = peer.dispatcher.protocolOffsets[p.index] @@ -1277,9 +1287,9 @@ proc postHelloSteps(peer: Peer, h: devp2p.hello) {.async.} = # the other peer may arrrive too early and be processed before # the handshake code got a change to wait for them. # - var messageProcessingLoop = peer.dispatchMessages() + peer.messageLoop = peer.dispatchMessages() - messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} = + peer.messageLoop.callback = proc(p: pointer) {.gcsafe.} = if messageProcessingLoop.failed: debug "Ending dispatchMessages loop", peer, err = messageProcessingLoop.error.msg