mirror of https://github.com/vacp2p/nim-quic.git
parent
9dd9b528cf
commit
cdade74acf
|
@ -10,7 +10,7 @@ on:
|
|||
jobs:
|
||||
test:
|
||||
runs-on: ${{ matrix.os }}
|
||||
timeout-minutes: 20
|
||||
timeout-minutes: 10
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ubuntu-latest, macOS-latest, windows-latest]
|
||||
|
|
|
@ -1,11 +1,17 @@
|
|||
import pkg/chronos
|
||||
import pkg/upraises
|
||||
import pkg/stew/results
|
||||
import std/options
|
||||
import ./udp/datagram
|
||||
import ./errors
|
||||
|
||||
export chronos
|
||||
export options
|
||||
export datagram
|
||||
export results
|
||||
export errors
|
||||
export upraises
|
||||
|
||||
template getOr*[T](o: Option[T], otherwise: untyped): T =
|
||||
if o.isSome():
|
||||
o.unsafeGet()
|
||||
else:
|
||||
otherwise
|
||||
|
|
|
@ -12,7 +12,7 @@ type
|
|||
udp: DatagramTransport
|
||||
quic: QuicConnection
|
||||
loop: Future[void]
|
||||
onClose: Opt[proc() {.gcsafe, upraises: [].}]
|
||||
onClose: Option[proc() {.gcsafe, upraises: [].}]
|
||||
closed: AsyncEvent
|
||||
IncomingConnection = ref object of Connection
|
||||
OutgoingConnection = ref object of Connection
|
||||
|
@ -27,7 +27,7 @@ proc `onRemoveId=`*(connection: Connection, callback: IdCallback) =
|
|||
connection.quic.onRemoveId = callback
|
||||
|
||||
proc `onClose=`*(connection: Connection, callback: proc() {.gcsafe, upraises: [].}) =
|
||||
connection.onClose = Opt.some(callback)
|
||||
connection.onClose = some callback
|
||||
|
||||
proc drop*(connection: Connection) {.async.} =
|
||||
await connection.quic.drop()
|
||||
|
@ -69,9 +69,8 @@ proc newIncomingConnection*(udp: DatagramTransport,
|
|||
let quic = newQuicServerConnection(udp.localAddress, remote, datagram)
|
||||
let closed = newAsyncEvent()
|
||||
let connection = IncomingConnection(udp: udp, quic: quic, closed: closed)
|
||||
proc onDisconnect {.async.} =
|
||||
quic.disconnect = some proc {.async.} =
|
||||
await connection.disconnect()
|
||||
quic.disconnect = Opt.some(onDisconnect)
|
||||
connection.startSending(remote)
|
||||
connection
|
||||
|
||||
|
@ -80,9 +79,8 @@ proc newOutgoingConnection*(udp: DatagramTransport,
|
|||
let quic = newQuicClientConnection(udp.localAddress, remote)
|
||||
let closed = newAsyncEvent()
|
||||
let connection = OutgoingConnection(udp: udp, quic: quic, closed: closed)
|
||||
proc onDisconnect {.async.} =
|
||||
quic.disconnect = some proc {.async.} =
|
||||
await connection.disconnect()
|
||||
quic.disconnect = Opt.some(onDisconnect)
|
||||
connection.startSending(remote)
|
||||
connection
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ proc newClosingConnection*(finalDatagram: Datagram, ids: seq[ConnectionId],
|
|||
state
|
||||
|
||||
proc sendFinalDatagram(state: ClosingConnection) =
|
||||
let connection = state.connection.valueOr: return
|
||||
let connection = state.connection.getOr: return
|
||||
try:
|
||||
connection.outgoing.putNoWait(state.finalDatagram)
|
||||
except AsyncQueueFullError:
|
||||
|
|
|
@ -6,7 +6,7 @@ import ./closedstate
|
|||
|
||||
type
|
||||
DisconnectingConnection* = ref object of ConnectionState
|
||||
connection: Opt[QuicConnection]
|
||||
connection: Option[QuicConnection]
|
||||
disconnect: Future[void]
|
||||
ids: seq[ConnectionId]
|
||||
|
||||
|
@ -15,7 +15,7 @@ proc newDisconnectingConnection*(ids: seq[ConnectionId]):
|
|||
DisconnectingConnection(ids: ids)
|
||||
|
||||
proc callDisconnect(connection: QuicConnection) {.async.} =
|
||||
let disconnect = connection.disconnect.valueOr: return
|
||||
let disconnect = connection.disconnect.getOr: return
|
||||
await disconnect()
|
||||
|
||||
{.push locks: "unknown".}
|
||||
|
@ -25,12 +25,12 @@ method ids*(state: DisconnectingConnection): seq[ConnectionId] =
|
|||
|
||||
method enter(state: DisconnectingConnection, connection: QuicConnection) =
|
||||
procCall enter(ConnectionState(state), connection)
|
||||
state.connection = Opt.some(connection)
|
||||
state.connection = some connection
|
||||
state.disconnect = callDisconnect(connection)
|
||||
|
||||
method leave(state: DisconnectingConnection) =
|
||||
procCall leave(ConnectionState(state))
|
||||
state.connection = Opt.none(QuicConnection)
|
||||
state.connection = QuicConnection.none
|
||||
|
||||
method send(state: DisconnectingConnection) =
|
||||
raise newException(ClosedConnectionError, "connection is disconnecting")
|
||||
|
@ -44,12 +44,12 @@ method openStream(state: DisconnectingConnection,
|
|||
|
||||
method close(state: DisconnectingConnection) {.async.} =
|
||||
await state.disconnect
|
||||
let connection = state.connection.valueOr: return
|
||||
let connection = state.connection.getOr: return
|
||||
connection.switch(newClosedConnection())
|
||||
|
||||
method drop(state: DisconnectingConnection) {.async.} =
|
||||
await state.disconnect
|
||||
let connection = state.connection.valueOr: return
|
||||
let connection = state.connection.getOr: return
|
||||
connection.switch(newClosedConnection())
|
||||
|
||||
{.pop.}
|
||||
|
|
|
@ -8,7 +8,7 @@ import ./closedstate
|
|||
|
||||
type
|
||||
DrainingConnection* = ref object of ConnectionState
|
||||
connection*: Opt[QuicConnection]
|
||||
connection*: Option[QuicConnection]
|
||||
ids: seq[ConnectionId]
|
||||
timeout: Timeout
|
||||
duration: Duration
|
||||
|
@ -33,14 +33,14 @@ push: {.locks: "unknown", upraises: [QuicError].}
|
|||
|
||||
method enter*(state: DrainingConnection, connection: QuicConnection) =
|
||||
procCall enter(ConnectionState(state), connection)
|
||||
state.connection = Opt.some(connection)
|
||||
state.connection = some connection
|
||||
state.timeout = newTimeout(proc {.upraises: [].} = state.onTimeout())
|
||||
state.timeout.set(state.duration)
|
||||
|
||||
method leave(state: DrainingConnection) =
|
||||
procCall leave(ConnectionState(state))
|
||||
state.timeout.stop()
|
||||
state.connection = Opt.none(QuicConnection)
|
||||
state.connection = QuicConnection.none
|
||||
|
||||
method ids(state: DrainingConnection): seq[ConnectionId] {.upraises: [].} =
|
||||
state.ids
|
||||
|
@ -57,13 +57,13 @@ method openStream(state: DrainingConnection,
|
|||
|
||||
method close(state: DrainingConnection) {.async.} =
|
||||
await state.done.wait()
|
||||
let connection = state.connection.valueOr: return
|
||||
let connection = state.connection.getOr: return
|
||||
let disconnecting = newDisconnectingConnection(state.ids)
|
||||
connection.switch(disconnecting)
|
||||
await disconnecting.close()
|
||||
|
||||
method drop(state: DrainingConnection) {.async.} =
|
||||
let connection = state.connection.valueOr: return
|
||||
let connection = state.connection.getOr: return
|
||||
let disconnecting = newDisconnectingConnection(state.ids)
|
||||
connection.switch(disconnecting)
|
||||
await disconnecting.drop()
|
||||
|
|
|
@ -13,7 +13,7 @@ import ./openstreams
|
|||
|
||||
type
|
||||
OpenConnection* = ref object of ConnectionState
|
||||
quicConnection: Opt[QuicConnection]
|
||||
quicConnection: Option[QuicConnection]
|
||||
ngtcp2Connection: Ngtcp2Connection
|
||||
streams: OpenStreams
|
||||
|
||||
|
@ -31,17 +31,13 @@ proc openServerConnection*(local, remote: TransportAddress,
|
|||
|
||||
method enter(state: OpenConnection, connection: QuicConnection) =
|
||||
procCall enter(ConnectionState(state), connection)
|
||||
state.quicConnection = Opt.some(connection)
|
||||
# Workaround weird bug
|
||||
proc onNewId(id: ConnectionId) =
|
||||
state.quicConnection = some connection
|
||||
state.ngtcp2Connection.onNewId = some proc(id: ConnectionId) =
|
||||
if isNil(connection.onNewId): return
|
||||
connection.onNewId(id)
|
||||
state.ngtcp2Connection.onNewId = Opt.some(onNewId)
|
||||
|
||||
proc onRemoveId(id: ConnectionId) =
|
||||
state.ngtcp2Connection.onRemoveId = some proc(id: ConnectionId) =
|
||||
if isNil(connection.onRemoveId): return
|
||||
connection.onRemoveId(id)
|
||||
state.ngtcp2Connection.onRemoveId = Opt.some(onRemoveId)
|
||||
state.ngtcp2Connection.onSend = proc(datagram: Datagram) =
|
||||
errorAsDefect:
|
||||
connection.outgoing.putNoWait(datagram)
|
||||
|
@ -55,7 +51,7 @@ method leave(state: OpenConnection) =
|
|||
procCall leave(ConnectionState(state))
|
||||
state.streams.closeAll()
|
||||
state.ngtcp2Connection.destroy()
|
||||
state.quicConnection = Opt.none(QuicConnection)
|
||||
state.quicConnection = QuicConnection.none
|
||||
|
||||
method ids(state: OpenConnection): seq[ConnectionId] {.upraises: [].} =
|
||||
state.ngtcp2Connection.ids
|
||||
|
@ -65,7 +61,7 @@ method send(state: OpenConnection) =
|
|||
|
||||
method receive(state: OpenConnection, datagram: Datagram) =
|
||||
state.ngtcp2Connection.receive(datagram)
|
||||
let quicConnection = state.quicConnection.valueOr: return
|
||||
let quicConnection = state.quicConnection.getOr: return
|
||||
if state.ngtcp2Connection.isDraining:
|
||||
let duration = state.ngtcp2Connection.closingDuration()
|
||||
let ids = state.ids
|
||||
|
@ -75,14 +71,14 @@ method receive(state: OpenConnection, datagram: Datagram) =
|
|||
|
||||
method openStream(state: OpenConnection,
|
||||
unidirectional: bool): Future[Stream] {.async.} =
|
||||
let quicConnection = state.quicConnection.valueOr:
|
||||
let quicConnection = state.quicConnection.getOr:
|
||||
raise newException(QuicError, "connection is closed")
|
||||
await quicConnection.handshake.wait()
|
||||
result = state.ngtcp2Connection.openStream(unidirectional = unidirectional)
|
||||
state.streams.add(result)
|
||||
|
||||
method close(state: OpenConnection) {.async.} =
|
||||
let quicConnection = state.quicConnection.valueOr: return
|
||||
let quicConnection = state.quicConnection.getOr: return
|
||||
let finalDatagram = state.ngtcp2Connection.close()
|
||||
let duration = state.ngtcp2Connection.closingDuration()
|
||||
let ids = state.ids
|
||||
|
@ -91,7 +87,7 @@ method close(state: OpenConnection) {.async.} =
|
|||
await closing.close()
|
||||
|
||||
method drop(state: OpenConnection) {.async.} =
|
||||
let quicConnection = state.quicConnection.valueOr: return
|
||||
let quicConnection = state.quicConnection.getOr: return
|
||||
let disconnecting = newDisconnectingConnection(state.ids)
|
||||
quicConnection.switch(disconnecting)
|
||||
await disconnecting.drop()
|
||||
|
|
|
@ -63,4 +63,4 @@ proc newNgtcp2Client*(local, remote: TransportAddress): Ngtcp2Connection =
|
|||
addr result[]
|
||||
)
|
||||
|
||||
result.conn = Opt.some(conn)
|
||||
result.conn = conn.some
|
||||
|
|
|
@ -13,7 +13,7 @@ import ./pointers
|
|||
|
||||
type
|
||||
Ngtcp2Connection* = ref object
|
||||
conn*: Opt[ptr ngtcp2_conn]
|
||||
conn*: Option[ptr ngtcp2_conn]
|
||||
path*: Path
|
||||
buffer*: array[4096, byte]
|
||||
flowing*: AsyncEvent
|
||||
|
@ -21,20 +21,20 @@ type
|
|||
onSend*: proc(datagram: Datagram) {.gcsafe, upraises:[].}
|
||||
onIncomingStream*: proc(stream: Stream)
|
||||
onHandshakeDone*: proc()
|
||||
onNewId*: Opt[proc(id: ConnectionId)]
|
||||
onRemoveId*: Opt[proc(id: ConnectionId)]
|
||||
onNewId*: Option[proc(id: ConnectionId)]
|
||||
onRemoveId*: Option[proc(id: ConnectionId)]
|
||||
Ngtcp2ConnectionClosed* = object of QuicError
|
||||
|
||||
proc destroy*(connection: Ngtcp2Connection) =
|
||||
let conn = connection.conn.valueOr: return
|
||||
let conn = connection.conn.getOr: return
|
||||
connection.timeout.stop()
|
||||
ngtcp2_conn_del(conn)
|
||||
connection.conn = Opt.none(ptr ngtcp2_conn)
|
||||
connection.conn = none(ptr ngtcp2_conn)
|
||||
connection.onSend = nil
|
||||
connection.onIncomingStream = nil
|
||||
connection.onHandshakeDone = nil
|
||||
connection.onNewId = Opt.none(proc(id: ConnectionId))
|
||||
connection.onRemoveId = Opt.none(proc(id: ConnectionId))
|
||||
connection.onNewId = none proc(id: ConnectionId)
|
||||
connection.onRemoveId = none proc(id: ConnectionId)
|
||||
|
||||
proc handleTimeout(connection: Ngtcp2Connection) {.gcsafe, upraises:[].}
|
||||
|
||||
|
@ -48,14 +48,14 @@ proc newConnection*(path: Path): Ngtcp2Connection =
|
|||
|
||||
proc ids*(connection: Ngtcp2Connection): seq[ConnectionId] =
|
||||
let
|
||||
conn = connection.conn.valueOr: return
|
||||
conn = connection.conn.getOr: return
|
||||
amount = ngtcp2_conn_get_num_scid(conn)
|
||||
var scids = newSeq[ngtcp2_cid](amount)
|
||||
discard ngtcp2_conn_get_scid(conn, scids.toPtr)
|
||||
scids.mapIt(ConnectionId(it.data[0..<it.datalen]))
|
||||
|
||||
proc updateTimeout*(connection: Ngtcp2Connection) =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
let expiry = ngtcp2_conn_get_expiry(conn)
|
||||
|
@ -69,7 +69,7 @@ proc trySend(connection: Ngtcp2Connection,
|
|||
messagePtr: ptr byte = nil,
|
||||
messageLen: uint = 0,
|
||||
written: ptr int = nil): Datagram =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
var packetInfo: ngtcp2_pkt_info
|
||||
|
@ -127,7 +127,7 @@ proc send*(connection: Ngtcp2Connection,
|
|||
|
||||
proc tryReceive(connection: Ngtcp2Connection, datagram: openArray[byte],
|
||||
ecn: ECN) =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
var packetInfo: ngtcp2_pkt_info
|
||||
|
@ -154,14 +154,14 @@ proc receive*(connection: Ngtcp2Connection, datagram: Datagram) =
|
|||
connection.receive(datagram.data, datagram.ecn)
|
||||
|
||||
proc handleTimeout(connection: Ngtcp2Connection) =
|
||||
let conn = connection.conn.valueOr: return
|
||||
let conn = connection.conn.getOr: return
|
||||
|
||||
errorAsDefect:
|
||||
checkResult ngtcp2_conn_handle_expiry(conn, now())
|
||||
connection.send()
|
||||
|
||||
proc close*(connection: Ngtcp2Connection): Datagram =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
var packetInfo: ngtcp2_pkt_info
|
||||
|
@ -180,31 +180,31 @@ proc close*(connection: Ngtcp2Connection): Datagram =
|
|||
Datagram(data: data, ecn: ecn)
|
||||
|
||||
proc closingDuration*(connection: Ngtcp2Connection): Duration =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
3 * ngtcp2_conn_get_pto(conn).int64.nanoseconds
|
||||
|
||||
proc isDraining*(connection: Ngtcp2Connection): bool =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
ngtcp2_conn_is_in_draining_period(conn).bool
|
||||
|
||||
proc isHandshakeCompleted*(connection: Ngtcp2Connection): bool =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
ngtcp2_conn_get_handshake_completed(conn).bool
|
||||
|
||||
proc openUniStream*(connection: Ngtcp2Connection): int64 =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
checkResult ngtcp2_conn_open_uni_stream(conn, addr result, nil)
|
||||
|
||||
proc openBidiStream*(connection: Ngtcp2Connection): int64 =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
checkResult ngtcp2_conn_open_bidi_stream(conn, addr result, nil)
|
||||
|
@ -212,7 +212,7 @@ proc openBidiStream*(connection: Ngtcp2Connection): int64 =
|
|||
proc setStreamUserData*(connection: Ngtcp2Connection,
|
||||
streamId: int64,
|
||||
userdata: pointer) =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
checkResult ngtcp2_conn_set_stream_user_data(conn, streamId, userdata)
|
||||
|
@ -220,14 +220,14 @@ proc setStreamUserData*(connection: Ngtcp2Connection,
|
|||
proc extendStreamOffset*(connection: Ngtcp2Connection,
|
||||
streamId: int64,
|
||||
amount: uint64) =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
checkResult conn.ngtcp2_conn_extend_max_stream_offset(streamId, amount)
|
||||
conn.ngtcp2_conn_extend_max_offset(amount)
|
||||
|
||||
proc shutdownStream*(connection: Ngtcp2Connection, streamId: int64) =
|
||||
let conn = connection.conn.valueOr:
|
||||
let conn = connection.conn.getOr:
|
||||
raise newException(Ngtcp2ConnectionClosed, "connection no longer exists")
|
||||
|
||||
checkResult ngtcp2_conn_shutdown_stream(conn, streamId, 0)
|
||||
|
|
|
@ -26,7 +26,7 @@ proc getNewConnectionId(conn: ptr ngtcp2_conn,
|
|||
|
||||
let
|
||||
connection = cast[Ngtcp2Connection](userData)
|
||||
onNewId = connection.onNewId.valueOr: return
|
||||
onNewId = connection.onNewId.getOr: return
|
||||
onNewId(newId)
|
||||
|
||||
proc removeConnectionId(conn: ptr ngtcp2_conn,
|
||||
|
@ -34,7 +34,7 @@ proc removeConnectionId(conn: ptr ngtcp2_conn,
|
|||
userData: pointer): cint {.cdecl.} =
|
||||
let
|
||||
connection = cast[Ngtcp2Connection](userData)
|
||||
onRemoveId = connection.onRemoveId.valueOr: return
|
||||
onRemoveId = connection.onRemoveId.getOr: return
|
||||
onRemoveId(id.toConnectionId)
|
||||
|
||||
proc installConnectionIdCallback*(callbacks: var ngtcp2_conn_callbacks) =
|
||||
|
|
|
@ -67,7 +67,7 @@ proc newNgtcp2Server*(local, remote: TransportAddress,
|
|||
addr result[]
|
||||
)
|
||||
|
||||
result.conn = Opt.some(conn)
|
||||
result.conn = some conn
|
||||
|
||||
proc extractIds(datagram: openArray[byte]): tuple[source, dest: ngtcp2_cid] =
|
||||
let info = parseDatagram(datagram)
|
||||
|
|
|
@ -4,7 +4,7 @@ import ./closedstate
|
|||
|
||||
type
|
||||
DrainingStream* = ref object of StreamState
|
||||
stream: Opt[Stream]
|
||||
stream: Option[Stream]
|
||||
remaining: AsyncQueue[seq[byte]]
|
||||
DrainingStreamError* = object of StreamError
|
||||
|
||||
|
@ -16,22 +16,22 @@ proc newDrainingStream*(messages: AsyncQueue[seq[byte]]): DrainingStream =
|
|||
|
||||
method enter(state: DrainingStream, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
state.stream = some stream
|
||||
|
||||
method leave(state: DrainingStream) =
|
||||
state.stream = Opt.none(Stream)
|
||||
state.stream = Stream.none
|
||||
|
||||
method read(state: DrainingStream): Future[seq[byte]] {.async.} =
|
||||
result = state.remaining.popFirstNoWait()
|
||||
if state.remaining.empty:
|
||||
let stream = state.stream.valueOr: return
|
||||
let stream = state.stream.getOr: return
|
||||
stream.switch(newClosedStream())
|
||||
|
||||
method write(state: DrainingStream, bytes: seq[byte]) {.async.} =
|
||||
raise newException(DrainingStreamError, "stream is draining")
|
||||
|
||||
method close(state: DrainingStream) {.async.} =
|
||||
let stream = state.stream.valueOr: return
|
||||
let stream = state.stream.getOr: return
|
||||
stream.switch(newClosedStream())
|
||||
|
||||
method onClose(state: DrainingStream) =
|
||||
|
|
|
@ -7,7 +7,7 @@ import ./closedstate
|
|||
|
||||
type
|
||||
OpenStream* = ref object of StreamState
|
||||
stream: Opt[Stream]
|
||||
stream: Option[Stream]
|
||||
connection: Ngtcp2Connection
|
||||
incoming: AsyncQueue[seq[byte]]
|
||||
|
||||
|
@ -18,7 +18,7 @@ proc newOpenStream*(connection: Ngtcp2Connection): OpenStream =
|
|||
)
|
||||
|
||||
proc setUserData(state: OpenStream, userdata: pointer) =
|
||||
let stream = state.stream.valueOr: return
|
||||
let stream = state.stream.getOr: return
|
||||
state.connection.setStreamUserData(stream.id, userdata)
|
||||
|
||||
proc clearUserData(state: OpenStream) =
|
||||
|
@ -36,30 +36,30 @@ proc allowMoreIncomingBytes(state: OpenStream, amount: uint64) =
|
|||
|
||||
method enter(state: OpenStream, stream: Stream) =
|
||||
procCall enter(StreamState(state), stream)
|
||||
state.stream = Opt.some(stream)
|
||||
state.stream = some stream
|
||||
state.setUserData(unsafeAddr state[])
|
||||
|
||||
method leave(state: OpenStream) =
|
||||
procCall leave(StreamState(state))
|
||||
state.clearUserData()
|
||||
state.stream = Opt.none(Stream)
|
||||
state.stream = Stream.none
|
||||
|
||||
method read(state: OpenStream): Future[seq[byte]] {.async.} =
|
||||
result = await state.incoming.get()
|
||||
state.allowMoreIncomingBytes(result.len.uint64)
|
||||
|
||||
method write(state: OpenStream, bytes: seq[byte]): Future[void] =
|
||||
let stream = state.stream.valueOr:
|
||||
let stream = state.stream.getOr:
|
||||
raise newException(QuicError, "stream is closed")
|
||||
state.connection.send(stream.id, bytes)
|
||||
|
||||
method close(state: OpenStream) {.async.} =
|
||||
let stream = state.stream.valueOr: return
|
||||
let stream = state.stream.getOr: return
|
||||
state.connection.shutdownStream(stream.id)
|
||||
stream.switch(newClosedStream())
|
||||
|
||||
method onClose*(state: OpenStream) =
|
||||
let stream = state.stream.valueOr: return
|
||||
let stream = state.stream.getOr: return
|
||||
if state.incoming.empty:
|
||||
stream.switch(newClosedStream())
|
||||
else:
|
||||
|
|
|
@ -8,7 +8,7 @@ type
|
|||
outgoing*: AsyncQueue[Datagram]
|
||||
incoming*: AsyncQueue[Stream]
|
||||
handshake*: AsyncEvent
|
||||
disconnect*: Opt[proc(): Future[void] {.gcsafe, upraises: [].}]
|
||||
disconnect*: Option[proc(): Future[void] {.gcsafe, upraises: [].}]
|
||||
onNewId*: IdCallback
|
||||
onRemoveId*: IdCallback
|
||||
ConnectionState* = ref object of RootObj
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import ../basics
|
||||
|
||||
type Timeout* = ref object
|
||||
timer: Opt[TimerCallback]
|
||||
timer: Option[TimerCallback]
|
||||
onExpiry: proc () {.gcsafe, upraises:[].}
|
||||
expired: AsyncEvent
|
||||
|
||||
|
@ -9,7 +9,7 @@ proc setTimer(timeout: Timeout, moment: Moment) =
|
|||
proc onTimeout(_: pointer) =
|
||||
timeout.expired.fire()
|
||||
timeout.onExpiry()
|
||||
timeout.timer = Opt.some(setTimer(moment, onTimeout))
|
||||
timeout.timer = some setTimer(moment, onTimeout)
|
||||
|
||||
const skip = proc () = discard
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import std/options
|
||||
import pkg/unittest2
|
||||
import pkg/ngtcp2
|
||||
import pkg/stew/results
|
||||
import pkg/quic/errors
|
||||
import pkg/quic/transport/ngtcp2/native/connection
|
||||
import pkg/quic/transport/ngtcp2/native/client
|
||||
|
@ -32,5 +32,5 @@ suite "ngtcp2 transport parameters":
|
|||
settings.transport_params.active_connection_id_limit = 0
|
||||
|
||||
expect QuicError:
|
||||
let conn = connection.conn.get()
|
||||
let conn = connection.conn.unsafeGet()
|
||||
conn.setRemoteTransportParameters(settings.transport_params)
|
||||
|
|
Loading…
Reference in New Issue