diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index 03e3579..0c486cf 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -242,16 +242,9 @@ proc generateNewUniqueSocket[A]( return none[UtpSocket[A]]() -proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}= - debug "Initiating connection", dst = s.socketKey - +proc innerConnect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.} = let startFut = s.startOutgoingSocket() - startFut.cancelCallback = proc(udata: pointer) {.gcsafe.} = - # if for some reason the future is cancelled, destroy socket to clear it - # from the active socket list - s.destroy() - try: await startFut utp_success_outgoing.inc() @@ -262,32 +255,62 @@ proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}= debug "Outgoing connection timed-out", dst = s.socketKey s.destroy() return err(OutgoingConnectionError(kind: ConnectionTimedOut)) + except CancelledError as exc: + s.destroy() + debug "Connection cancelled", dst = s.socketKey + raise exc + +proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] = + info "Initiating connection", dst = s.socketKey + + # Create inner future, to make sure we are installing cancelCallback + # on whole connection future, and not only part of it + let connectionFuture = s.innerConnect() + + connectionFuture.cancelCallback = proc(udata: pointer) {.gcsafe.} = + debug "Connection cancel callback fired", + socketKey = s.socketKey + # if for some reason the future is cancelled, destroy socket to clear it + # from the active socket list + s.destroy() + + return connectionFuture + +proc socketAlreadyExists[A](): ConnectionResult[A] = + return err(OutgoingConnectionError(kind: SocketAlreadyExists)) + +proc socketAlreadyExistsFut[A](): Future[ConnectionResult[A]] = + let fut = newFuture[ConnectionResult[A]]() + fut.complete(socketAlreadyExists[A]()) + return fut # Connect to provided address # Reference implementation: # https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 proc connectTo*[A]( - r: UtpRouter[A], address: A): Future[ConnectionResult[A]] {.async.} = + r: UtpRouter[A], address: A): Future[ConnectionResult[A]] = let maybeSocket = r.generateNewUniqueSocket(address) if (maybeSocket.isNone()): - return err(OutgoingConnectionError(kind: SocketAlreadyExists)) + return socketAlreadyExistsFut[A]() else: let socket = maybeSocket.unsafeGet() - return await socket.connect() + let connFut = socket.connect() + return connFut # Connect to provided address with provided connection id, if socket with this id # and address already exsits return error proc connectTo*[A]( r: UtpRouter[A], address: A, connectionId: uint16): - Future[ConnectionResult[A]] {.async.} = + Future[ConnectionResult[A]] = let socket = newOutgoingSocket[A]( address, r.sendCb, r.socketConfig, connectionId, r.rng[]) if (r.registerIfAbsent(socket)): - return await socket.connect() + let connFut = socket.connect() + return connFut else: - return err(OutgoingConnectionError(kind: SocketAlreadyExists)) + return socketAlreadyExistsFut[A]() proc shutdown*[A](r: UtpRouter[A]) = # stop processing any new packets and close all sockets in background without diff --git a/tests/utp/test_utp_router.nim b/tests/utp/test_utp_router.nim index 5c4c76b..cb198cb 100644 --- a/tests/utp/test_utp_router.nim +++ b/tests/utp/test_utp_router.nim @@ -335,6 +335,8 @@ procSuite "Utp router unit tests": await connectFuture.cancelAndWait() + await waitUntil(proc (): bool = router.len() == 0) + check: router.len() == 0