Install cancel callback on whole connection future (#508)

This commit is contained in:
KonradStaniec 2022-05-30 10:28:10 +02:00 committed by GitHub
parent 00ed6ad312
commit dffaa78cbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 14 deletions

View File

@ -242,16 +242,9 @@ proc generateNewUniqueSocket[A](
return none[UtpSocket[A]]() return none[UtpSocket[A]]()
proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}= proc innerConnect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.} =
debug "Initiating connection", dst = s.socketKey
let startFut = s.startOutgoingSocket() let startFut = s.startOutgoingSocket()
startFut.cancelCallback = proc(udata: pointer) {.gcsafe.} =
# if for some reason the future is cancelled, destroy socket to clear it
# from the active socket list
s.destroy()
try: try:
await startFut await startFut
utp_success_outgoing.inc() utp_success_outgoing.inc()
@ -262,32 +255,62 @@ proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}=
debug "Outgoing connection timed-out", dst = s.socketKey debug "Outgoing connection timed-out", dst = s.socketKey
s.destroy() s.destroy()
return err(OutgoingConnectionError(kind: ConnectionTimedOut)) return err(OutgoingConnectionError(kind: ConnectionTimedOut))
except CancelledError as exc:
s.destroy()
debug "Connection cancelled", dst = s.socketKey
raise exc
proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] =
info "Initiating connection", dst = s.socketKey
# Create inner future, to make sure we are installing cancelCallback
# on whole connection future, and not only part of it
let connectionFuture = s.innerConnect()
connectionFuture.cancelCallback = proc(udata: pointer) {.gcsafe.} =
debug "Connection cancel callback fired",
socketKey = s.socketKey
# if for some reason the future is cancelled, destroy socket to clear it
# from the active socket list
s.destroy()
return connectionFuture
proc socketAlreadyExists[A](): ConnectionResult[A] =
return err(OutgoingConnectionError(kind: SocketAlreadyExists))
proc socketAlreadyExistsFut[A](): Future[ConnectionResult[A]] =
let fut = newFuture[ConnectionResult[A]]()
fut.complete(socketAlreadyExists[A]())
return fut
# Connect to provided address # Connect to provided address
# Reference implementation: # Reference implementation:
# https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 # https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732
proc connectTo*[A]( proc connectTo*[A](
r: UtpRouter[A], address: A): Future[ConnectionResult[A]] {.async.} = r: UtpRouter[A], address: A): Future[ConnectionResult[A]] =
let maybeSocket = r.generateNewUniqueSocket(address) let maybeSocket = r.generateNewUniqueSocket(address)
if (maybeSocket.isNone()): if (maybeSocket.isNone()):
return err(OutgoingConnectionError(kind: SocketAlreadyExists)) return socketAlreadyExistsFut[A]()
else: else:
let socket = maybeSocket.unsafeGet() let socket = maybeSocket.unsafeGet()
return await socket.connect() let connFut = socket.connect()
return connFut
# Connect to provided address with provided connection id, if socket with this id # Connect to provided address with provided connection id, if socket with this id
# and address already exsits return error # and address already exsits return error
proc connectTo*[A]( proc connectTo*[A](
r: UtpRouter[A], address: A, connectionId: uint16): r: UtpRouter[A], address: A, connectionId: uint16):
Future[ConnectionResult[A]] {.async.} = Future[ConnectionResult[A]] =
let socket = newOutgoingSocket[A]( let socket = newOutgoingSocket[A](
address, r.sendCb, r.socketConfig, connectionId, r.rng[]) address, r.sendCb, r.socketConfig, connectionId, r.rng[])
if (r.registerIfAbsent(socket)): if (r.registerIfAbsent(socket)):
return await socket.connect() let connFut = socket.connect()
return connFut
else: else:
return err(OutgoingConnectionError(kind: SocketAlreadyExists)) return socketAlreadyExistsFut[A]()
proc shutdown*[A](r: UtpRouter[A]) = proc shutdown*[A](r: UtpRouter[A]) =
# stop processing any new packets and close all sockets in background without # stop processing any new packets and close all sockets in background without

View File

@ -335,6 +335,8 @@ procSuite "Utp router unit tests":
await connectFuture.cancelAndWait() await connectFuture.cancelAndWait()
await waitUntil(proc (): bool = router.len() == 0)
check: check:
router.len() == 0 router.len() == 0