fix(transport): tcp accept fixes (#1170)

Address the comments in https://github.com/vacp2p/nim-libp2p/pull/1164
This commit is contained in:
diegomrsantos 2024-09-09 13:49:33 +02:00 committed by GitHub
parent d98152f266
commit 8070b21825
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 13 additions and 2 deletions

View File

@ -226,10 +226,17 @@ method accept*(self: TcpTransport): Future[Connection] =
proc impl( proc impl(
self: TcpTransport self: TcpTransport
): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} = ): Future[Connection] {.async: (raises: [transport.TransportError, CancelledError]).} =
proc cancelAcceptFuts() =
for fut in self.acceptFuts:
if not fut.completed():
fut.cancel()
if not self.running: if not self.running:
raise newTransportClosedError() raise newTransportClosedError()
if self.acceptFuts.len <= 0: if self.servers.len == 0:
raise (ref TcpTransportError)(msg: "No listeners configured")
elif self.acceptFuts.len == 0:
# Holds futures representing ongoing accept calls on multiple servers. # Holds futures representing ongoing accept calls on multiple servers.
self.acceptFuts = self.servers.mapIt(it.accept()) self.acceptFuts = self.servers.mapIt(it.accept())
@ -239,7 +246,10 @@ method accept*(self: TcpTransport): Future[Connection] =
# Waits for any one of these futures to complete, indicating that a new connection has been accepted on one of the servers. # Waits for any one of these futures to complete, indicating that a new connection has been accepted on one of the servers.
await one(self.acceptFuts) await one(self.acceptFuts)
except ValueError: except ValueError:
raise (ref TcpTransportError)(msg: "No listeners configured") raiseAssert "Accept futures should not be empty"
except CancelledError as exc:
cancelAcceptFuts()
raise exc
index = self.acceptFuts.find(finished) index = self.acceptFuts.find(finished)
# A new connection has been accepted. The corresponding server should immediately start accepting another connection. # A new connection has been accepted. The corresponding server should immediately start accepting another connection.
@ -261,6 +271,7 @@ method accept*(self: TcpTransport): Future[Connection] =
except common.TransportError as exc: # Needed for chronos 4.0.0 support except common.TransportError as exc: # Needed for chronos 4.0.0 support
raise (ref TcpTransportError)(msg: exc.msg, parent: exc) raise (ref TcpTransportError)(msg: exc.msg, parent: exc)
except CancelledError as exc: except CancelledError as exc:
cancelAcceptFuts()
raise exc raise exc
if not self.running: # Stopped while waiting if not self.running: # Stopped while waiting