New state for quic connection: disconnecting

When a quic connection is disconnecting, close the
send loop and the incoming udp connection.
This commit is contained in:
Mark Spanbroek 2020-12-21 14:35:12 +01:00 committed by markspanbroek
parent 2aee8ff51e
commit 15e3c4cf1c
8 changed files with 122 additions and 58 deletions

View File

@ -14,7 +14,6 @@ type
udp: DatagramTransport udp: DatagramTransport
quic: QuicConnection quic: QuicConnection
loop: Future[void] loop: Future[void]
closed: bool
onClose: proc() onClose: proc()
IncomingConnection = ref object of Connection IncomingConnection = ref object of Connection
OutgoingConnection = ref object of Connection OutgoingConnection = ref object of Connection
@ -40,18 +39,36 @@ proc startSending(connection: Connection, remote: TransportAddress) =
proc stopSending(connection: Connection) {.async.} = proc stopSending(connection: Connection) {.async.} =
await connection.loop.cancelAndWait() await connection.loop.cancelAndWait()
method closeUdp(connection: Connection) {.async, base.} =
discard
method closeUdp(connection: OutgoingConnection) {.async.} =
await connection.udp.closeWait()
proc disconnect(connection: Connection) {.async.} =
await connection.stopSending()
await connection.closeUdp()
if connection.onClose != nil:
connection.onClose()
proc newIncomingConnection*(udp: DatagramTransport, proc newIncomingConnection*(udp: DatagramTransport,
remote: TransportAddress): Connection = remote: TransportAddress): Connection =
let datagram = Datagram(data: udp.getMessage()) let datagram = Datagram(data: udp.getMessage())
let quic = newQuicServerConnection(udp.localAddress, remote, datagram) let quic = newQuicServerConnection(udp.localAddress, remote, datagram)
result = IncomingConnection(udp: udp, quic: quic) let connection = IncomingConnection(udp: udp, quic: quic)
result.startSending(remote) quic.disconnect = proc {.async.} =
await connection.disconnect()
connection.startSending(remote)
connection
proc newOutgoingConnection*(udp: DatagramTransport, proc newOutgoingConnection*(udp: DatagramTransport,
remote: TransportAddress): Connection = remote: TransportAddress): Connection =
let quic = newQuicClientConnection(udp.localAddress, remote) let quic = newQuicClientConnection(udp.localAddress, remote)
result = OutgoingConnection(udp: udp, quic: quic) let connection = OutgoingConnection(udp: udp, quic: quic)
result.startSending(remote) quic.disconnect = proc {.async.} =
await connection.disconnect()
connection.startSending(remote)
connection
proc startHandshake*(connection: Connection) = proc startHandshake*(connection: Connection) =
connection.quic.send() connection.quic.send()
@ -66,21 +83,8 @@ proc openStream*(connection: Connection): Future[Stream] {.async.} =
proc incomingStream*(connection: Connection): Future[Stream] {.async.} = proc incomingStream*(connection: Connection): Future[Stream] {.async.} =
result = await connection.quic.incomingStream() result = await connection.quic.incomingStream()
method closeUdp(connection: Connection) {.async, base.} =
discard
method closeUdp(connection: OutgoingConnection) {.async.} =
await connection.udp.closeWait()
proc drop*(connection: Connection) {.async.} = proc drop*(connection: Connection) {.async.} =
if not connection.closed: await connection.quic.drop()
connection.closed = true
await connection.stopSending()
await connection.closeUdp()
if connection.onClose != nil:
connection.onClose()
connection.quic.drop()
proc close*(connection: Connection) {.async.} = proc close*(connection: Connection) {.async.} =
await connection.quic.close() await connection.quic.close()
await connection.drop()

View File

@ -31,7 +31,7 @@ method openStream(state: ClosedConnection): Future[Stream] {.async.} =
method close(state: ClosedConnection) {.async.} = method close(state: ClosedConnection) {.async.} =
discard discard
method drop(state: ClosedConnection) = method drop(state: ClosedConnection) {.async.} =
discard discard
{.pop.} {.pop.}

View File

@ -0,0 +1,51 @@
import pkg/chronos
import ../../../udp/datagram
import ../../quicconnection
import ../../connectionid
import ../../stream
import ./closedstate
type
DisconnectingConnection* = ref object of ConnectionState
connection: QuicConnection
disconnect: Future[void]
ids: seq[ConnectionId]
proc newDisconnectingConnection*(ids: seq[ConnectionId]):
DisconnectingConnection =
DisconnectingConnection(ids: ids)
proc callDisconnect(connection: QuicConnection) {.async.} =
if connection.disconnect != nil:
await connection.disconnect()
{.push locks: "unknown".}
method ids*(state: DisconnectingConnection): seq[ConnectionId] =
state.ids
method enter(state: DisconnectingConnection, connection: QuicConnection) =
state.connection = connection
state.disconnect = callDisconnect(connection)
method leave(state: DisconnectingConnection) =
state.connection = nil
method send(state: DisconnectingConnection) =
raise newException(ClosedConnectionError, "connection is disconnecting")
method receive(state: DisconnectingConnection, datagram: Datagram) =
discard
method openStream(state: DisconnectingConnection): Future[Stream] {.async.} =
raise newException(ClosedConnectionError, "connection is disconnecting")
method close(state: DisconnectingConnection) {.async.} =
await state.disconnect
state.connection.switch(newClosedConnection())
method drop(state: DisconnectingConnection) {.async.} =
await state.disconnect
state.connection.switch(newClosedConnection())
{.pop.}

View File

@ -4,6 +4,7 @@ import ../../quicconnection
import ../../connectionid import ../../connectionid
import ../../stream import ../../stream
import ../../timeout import ../../timeout
import ./disconnectingstate
import ./closedstate import ./closedstate
type type
@ -54,8 +55,13 @@ method openStream(state: DrainingConnection): Future[Stream] {.async.} =
method close(state: DrainingConnection) {.async.} = method close(state: DrainingConnection) {.async.} =
await state.done.wait() await state.done.wait()
let disconnecting = newDisconnectingConnection(state.ids)
state.connection.switch(disconnecting)
await disconnecting.close()
method drop(state: DrainingConnection) = method drop(state: DrainingConnection) {.async.} =
state.connection.switch(newClosedConnection()) let disconnecting = newDisconnectingConnection(state.ids)
state.connection.switch(disconnecting)
await disconnecting.drop()
{.pop.} {.pop.}

View File

@ -6,7 +6,7 @@ import ../../stream
import ../connection import ../connection
import ../streams import ../streams
import ./closingstate import ./closingstate
import ./closedstate import ./disconnectingstate
type type
OpenConnection* = ref object of ConnectionState OpenConnection* = ref object of ConnectionState
@ -46,8 +46,10 @@ method close(state: OpenConnection) {.async.} =
state.quicConnection.switch(closing) state.quicConnection.switch(closing)
await closing.close() await closing.close()
method drop(state: OpenConnection) = method drop(state: OpenConnection) {.async.} =
state.quicConnection.switch(newClosedConnection()) let disconnecting = newDisconnectingConnection(state.ids)
state.quicConnection.switch(disconnecting)
await disconnecting.drop()
method `onNewId=`*(state: OpenConnection, callback: IdCallback) = method `onNewId=`*(state: OpenConnection, callback: IdCallback) =
state.ngtcp2Connection.onNewId = callback state.ngtcp2Connection.onNewId = callback

View File

@ -10,6 +10,7 @@ type
incoming*: AsyncQueue[Stream] incoming*: AsyncQueue[Stream]
handshake*: AsyncEvent handshake*: AsyncEvent
closed*: AsyncEvent closed*: AsyncEvent
disconnect*: proc(): Future[void] {.gcsafe.}
ConnectionState* = ref object of RootObj ConnectionState* = ref object of RootObj
IdCallback* = proc(id: ConnectionId) IdCallback* = proc(id: ConnectionId)
ConnectionError* = object of IOError ConnectionError* = object of IOError
@ -34,7 +35,7 @@ method receive*(state: ConnectionState, datagram: Datagram) =
method openStream*(state: ConnectionState): Future[Stream] = method openStream*(state: ConnectionState): Future[Stream] =
doAssert false # override this method doAssert false # override this method
method drop*(state: ConnectionState) = method drop*(state: ConnectionState): Future[void] =
doAssert false # override this method doAssert false # override this method
method close*(state: ConnectionState): Future[void] = method close*(state: ConnectionState): Future[void] =
@ -88,5 +89,5 @@ proc incomingStream*(connection: QuicConnection): Future[Stream] =
proc close*(connection: QuicConnection): Future[void] = proc close*(connection: QuicConnection): Future[void] =
connection.state.close() connection.state.close()
proc drop*(connection: QuicConnection) = proc drop*(connection: QuicConnection): Future[void] =
connection.state.drop() connection.state.drop()

View File

@ -12,20 +12,20 @@ suite "quic connection":
asynctest "sends outgoing datagrams": asynctest "sends outgoing datagrams":
let client = newQuicClientConnection(zeroAddress, zeroAddress) let client = newQuicClientConnection(zeroAddress, zeroAddress)
defer: client.drop() defer: await client.drop()
client.send() client.send()
let datagram = await client.outgoing.get() let datagram = await client.outgoing.get()
check datagram.len > 0 check datagram.len > 0
asynctest "processes received datagrams": asynctest "processes received datagrams":
let client = newQuicClientConnection(zeroAddress, zeroAddress) let client = newQuicClientConnection(zeroAddress, zeroAddress)
defer: client.drop() defer: await client.drop()
client.send() client.send()
let datagram = await client.outgoing.get() let datagram = await client.outgoing.get()
let server = newQuicServerConnection(zeroAddress, zeroAddress, datagram) let server = newQuicServerConnection(zeroAddress, zeroAddress, datagram)
defer: server.drop() defer: await server.drop()
server.receive(datagram) server.receive(datagram)
@ -37,8 +37,8 @@ suite "quic connection":
asynctest "performs handshake": asynctest "performs handshake":
let (client, server) = await performHandshake() let (client, server) = await performHandshake()
defer: client.drop() defer: await client.drop()
defer: server.drop() defer: await server.drop()
check client.handshake.isSet() check client.handshake.isSet()
check server.handshake.isSet() check server.handshake.isSet()
@ -46,8 +46,8 @@ suite "quic connection":
asynctest "performs handshake multiple times": asynctest "performs handshake multiple times":
for i in 1..100: for i in 1..100:
let (client, server) = await performHandshake() let (client, server) = await performHandshake()
client.drop() await client.drop()
server.drop() await server.drop()
asynctest "returns the current connection ids": asynctest "returns the current connection ids":
let (client, server) = await setupConnection() let (client, server) = await setupConnection()
@ -57,8 +57,8 @@ suite "quic connection":
asynctest "notifies about id changes": asynctest "notifies about id changes":
let (client, server) = await setupConnection() let (client, server) = await setupConnection()
defer: client.drop defer: await client.drop
defer: server.drop defer: await server.drop
var newId: ConnectionId var newId: ConnectionId
server.onNewId = proc (id: ConnectionId) = server.onNewId = proc (id: ConnectionId) =
@ -72,12 +72,12 @@ suite "quic connection":
asynctest "fires event when closed": asynctest "fires event when closed":
let client = newQuicClientConnection(zeroAddress, zeroAddress) let client = newQuicClientConnection(zeroAddress, zeroAddress)
client.drop() await client.drop()
check client.closed.isSet() check client.closed.isSet()
asynctest "raises ConnectionError when closed": asynctest "raises ConnectionError when closed":
let connection = newQuicClientConnection(zeroAddress, zeroAddress) let connection = newQuicClientConnection(zeroAddress, zeroAddress)
connection.drop() await connection.drop()
expect ConnectionError: expect ConnectionError:
connection.send() connection.send()
@ -88,8 +88,8 @@ suite "quic connection":
expect ConnectionError: expect ConnectionError:
discard await connection.openStream() discard await connection.openStream()
test "has empty list of ids when closed": asynctest "has empty list of ids when closed":
let connection = newQuicClientConnection(zeroAddress, zeroAddress) let connection = newQuicClientConnection(zeroAddress, zeroAddress)
connection.drop() await connection.drop()
check connection.ids.len == 0 check connection.ids.len == 0

View File

@ -15,8 +15,8 @@ suite "streams":
let (client, server) = await performHandshake() let (client, server) = await performHandshake()
check client.openStream() != client.openStream() check client.openStream() != client.openStream()
client.drop() await client.drop()
server.drop() await server.drop()
asynctest "closes stream": asynctest "closes stream":
let (client, server) = await performHandshake() let (client, server) = await performHandshake()
@ -24,8 +24,8 @@ suite "streams":
await stream.close() await stream.close()
client.drop() await client.drop()
server.drop() await server.drop()
asynctest "writes to stream": asynctest "writes to stream":
let (client, server) = await performHandshake() let (client, server) = await performHandshake()
@ -35,8 +35,8 @@ suite "streams":
check client.outgoing.anyIt(it.data.contains(message)) check client.outgoing.anyIt(it.data.contains(message))
client.drop() await client.drop()
server.drop() await server.drop()
asynctest "writes zero-length message": asynctest "writes zero-length message":
let (client, server) = await performHandshake() let (client, server) = await performHandshake()
@ -46,8 +46,8 @@ suite "streams":
check datagram.len > 0 check datagram.len > 0
client.drop() await client.drop()
server.drop() await server.drop()
asynctest "raises when reading from or writing to closed stream": asynctest "raises when reading from or writing to closed stream":
let (client, server) = await performHandshake() let (client, server) = await performHandshake()
@ -60,8 +60,8 @@ suite "streams":
expect IOError: expect IOError:
await stream.write(@[1'u8, 2'u8, 3'u8]) await stream.write(@[1'u8, 2'u8, 3'u8])
client.drop() await client.drop()
server.drop() await server.drop()
asynctest "accepts incoming streams": asynctest "accepts incoming streams":
let (client, server) = await performHandshake() let (client, server) = await performHandshake()
@ -74,8 +74,8 @@ suite "streams":
check clientStream.id == serverStream.id check clientStream.id == serverStream.id
await simulation.cancelAndWait() await simulation.cancelAndWait()
client.drop() await client.drop()
server.drop() await server.drop()
asynctest "reads from stream": asynctest "reads from stream":
let (client, server) = await performHandshake() let (client, server) = await performHandshake()
@ -92,8 +92,8 @@ suite "streams":
check incoming == message check incoming == message
await simulation.cancelAndWait() await simulation.cancelAndWait()
client.drop() await client.drop()
server.drop() await server.drop()
asynctest "writes long messages to stream": asynctest "writes long messages to stream":
let (client, server) = await performHandshake() let (client, server) = await performHandshake()
@ -108,8 +108,8 @@ suite "streams":
discard await incoming.read() discard await incoming.read()
await simulation.cancelAndWait() await simulation.cancelAndWait()
client.drop() await client.drop()
server.drop() await server.drop()
asynctest "handles packet loss": asynctest "handles packet loss":
@ -126,5 +126,5 @@ suite "streams":
check incoming == message check incoming == message
await simulation.cancelAndWait() await simulation.cancelAndWait()
client.drop() await client.drop()
server.drop() await server.drop()