diff --git a/eth/p2p/p2p_backends_helpers.nim b/eth/p2p/p2p_backends_helpers.nim index 5e8f08d..350e112 100644 --- a/eth/p2p/p2p_backends_helpers.nim +++ b/eth/p2p/p2p_backends_helpers.nim @@ -38,73 +38,3 @@ proc initProtocolStates(peer: Peer, protocols: openarray[ProtocolInfo]) = if peerStateInit != nil: peer.protocolStates[protocol.index] = peerStateInit(peer) -proc resolveFuture[MsgType](msg: pointer, future: FutureBase) {.gcsafe.} = - var f = Future[MsgType](future) - doAssert(not f.finished()) - f.complete(cast[ptr MsgType](msg)[]) - -proc requestResolver[MsgType](msg: pointer, future: FutureBase) {.gcsafe.} = - var f = Future[Option[MsgType]](future) - if not f.finished: - if msg != nil: - f.complete some(cast[ptr MsgType](msg)[]) - else: - f.complete none(MsgType) - else: - # This future was already resolved, but let's do some sanity checks - # here. The only reasonable explanation is that the request should - # have timed out. - if msg != nil: - if f.read.isSome: - doAssert false, "trying to resolve a request twice" - else: - doAssert false, "trying to resolve a timed out request with a value" - else: - try: - if not f.read.isSome: - doAssert false, "a request timed out twice" - # This can except when the future still completes with an error. - # E.g. the `sendMsg` fails because of an already closed transport or a - # broken pipe - except TransportOsError as e: - # E.g. broken pipe - trace "TransportOsError during request", err = e.msg - except TransportError: - trace "Transport got closed during request" - except Exception as e: - debug "Exception in requestResolver()", exc = e.name, err = e.msg - raise e - -proc linkSendFailureToReqFuture[S, R](sendFut: Future[S], resFut: Future[R]) = - sendFut.addCallback() do (arg: pointer): - if not sendFut.error.isNil: - resFut.fail(sendFut.error) - -proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} = - result = "" - # TODO: uncommenting the line below increases the compile-time - # tremendously (for reasons not yet known) - # result = $(cast[ptr MsgType](msg)[]) - -proc disconnectAndRaise(peer: Peer, - reason: DisconnectionReason, - msg: string) {.async.} - -proc handshakeImpl[T](peer: Peer, - sendFut: Future[void], - responseFut: Future[T], - timeout: Duration): Future[T] {.async.} = - sendFut.addCallback do (arg: pointer) {.gcsafe.}: - if sendFut.failed: - debug "Handshake message not delivered", peer - - doAssert timeout.milliseconds > 0 - yield responseFut or sleepAsync(timeout) - if not responseFut.finished: - await disconnectAndRaise(peer, HandshakeTimeout, - "Protocol handshake was not received in time.") - elif responseFut.failed: - raise responseFut.error - else: - return responseFut.read - diff --git a/eth/p2p/rlpx.nim b/eth/p2p/rlpx.nim index cfa18e0..bf3ef22 100644 --- a/eth/p2p/rlpx.nim +++ b/eth/p2p/rlpx.nim @@ -58,6 +58,49 @@ chronicles.formatIt(Peer): $(it.remote) include p2p_backends_helpers +proc requestResolver[MsgType](msg: pointer, future: FutureBase) {.gcsafe.} = + var f = Future[Option[MsgType]](future) + if not f.finished: + if msg != nil: + f.complete some(cast[ptr MsgType](msg)[]) + else: + f.complete none(MsgType) + else: + # This future was already resolved, but let's do some sanity checks + # here. The only reasonable explanation is that the request should + # have timed out. + if msg != nil: + if f.read.isSome: + doAssert false, "trying to resolve a request twice" + else: + doAssert false, "trying to resolve a timed out request with a value" + else: + try: + if not f.read.isSome: + doAssert false, "a request timed out twice" + # This can except when the future still completes with an error. + # E.g. the `sendMsg` fails because of an already closed transport or a + # broken pipe + except TransportOsError as e: + # E.g. broken pipe + trace "TransportOsError during request", err = e.msg + except TransportError: + trace "Transport got closed during request" + except Exception as e: + debug "Exception in requestResolver()", exc = e.name, err = e.msg + raise e + +proc linkSendFailureToReqFuture[S, R](sendFut: Future[S], resFut: Future[R]) = + sendFut.addCallback() do (arg: pointer): + if not sendFut.error.isNil: + resFut.fail(sendFut.error) + +proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} = + result = "" + # TODO: uncommenting the line below increases the compile-time + # tremendously (for reasons not yet known) + # result = $(cast[ptr MsgType](msg)[]) + proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.gcsafe, async.} @@ -73,6 +116,24 @@ proc disconnectAndRaise(peer: Peer, await peer.disconnect(r) raisePeerDisconnected(msg, r) +proc handshakeImpl[T](peer: Peer, + sendFut: Future[void], + responseFut: Future[T], + timeout: Duration): Future[T] {.async.} = + sendFut.addCallback do (arg: pointer) {.gcsafe.}: + if sendFut.failed: + debug "Handshake message not delivered", peer + + doAssert timeout.milliseconds > 0 + yield responseFut or sleepAsync(timeout) + if not responseFut.finished: + await disconnectAndRaise(peer, HandshakeTimeout, + "Protocol handshake was not received in time.") + elif responseFut.failed: + raise responseFut.error + else: + return responseFut.read + var gDevp2pInfo: ProtocolInfo template devp2pInfo: auto = {.gcsafe.}: gDevp2pInfo