Move stream sending code to ngtcp2/connection module

This commit is contained in:
Mark Spanbroek 2020-12-17 13:04:51 +01:00 committed by markspanbroek
parent 15c212477d
commit 37e5eb0eb4
3 changed files with 38 additions and 60 deletions

View File

@ -10,6 +10,7 @@ import ../connectionid
import ./path
import ./errors
import ./timestamp
import ./pointers
type
Ngtcp2Connection* = ref object
@ -53,14 +54,23 @@ proc updateTimeout*(connection: Ngtcp2Connection) =
else:
connection.timeout.stop()
proc trySend(connection: Ngtcp2Connection): Datagram =
proc trySend(connection: Ngtcp2Connection,
streamId: int64 = -1,
messagePtr: ptr byte = nil,
messageLen: uint = 0,
written: ptr int = nil): Datagram =
var packetInfo: ngtcp2_pkt_info
let length = ngtcp2_conn_write_pkt(
let length = ngtcp2_conn_write_stream(
connection.conn,
connection.path.toPathPtr,
addr packetInfo,
addr connection.buffer[0],
connection.buffer.len.uint,
written,
0,
streamId,
messagePtr,
messageLen,
now()
)
checkResult length.cint
@ -78,6 +88,30 @@ proc send*(connection: Ngtcp2Connection) =
done = true
connection.updateTimeout()
proc send(connection: Ngtcp2Connection,
streamId: int64,
messagePtr: ptr byte,
messageLen: uint): Future[int] {.async.} =
let written = addr result
var datagram = trySend(connection, streamId, messagePtr, messageLen, written)
while datagram.data.len == 0:
connection.flowing.clear()
await connection.flowing.wait()
datagram = trySend(connection, streamId, messagePtr, messageLen, written)
connection.onSend(datagram)
connection.updateTimeout()
proc send*(connection: Ngtcp2Connection,
streamId: int64, bytes: seq[byte]) {.async.} =
var messagePtr = bytes.toUnsafePtr
var messageLen = bytes.len.uint
var done = false
while not done:
let written = await connection.send(streamId, messagePtr, messageLen)
messagePtr = messagePtr + written
messageLen = messageLen - written.uint
done = messageLen == 0
proc tryReceive(connection: Ngtcp2Connection, datagram: openArray[byte],
ecn: ECN) =
var packetInfo: ngtcp2_pkt_info

View File

@ -1,11 +1,8 @@
import pkg/chronos
import pkg/ngtcp2
import ../../../helpers/openarray
import ../../stream
import ../connection
import ../errors
import ../pointers
import ./sending
import ./closedstate
type
@ -36,17 +33,8 @@ method leave(state: OpenStream) =
method read(state: OpenStream): Future[seq[byte]] {.async.} =
result = await state.incoming.get()
method write(state: OpenStream, bytes: seq[byte]) {.async.} =
let connection = state.connection
let streamId = state.stream.id
var messagePtr = bytes.toUnsafePtr
var messageLen = bytes.len.uint
var done = false
while not done:
let written = await send(connection, streamId, messagePtr, messageLen)
messagePtr = messagePtr + written
messageLen = messageLen - written.uint
done = messageLen == 0
method write(state: OpenStream, bytes: seq[byte]): Future[void] =
state.connection.send(state.stream.id, bytes)
method close(state: OpenStream) {.async.} =
let conn = state.connection.conn

View File

@ -1,44 +0,0 @@
import pkg/chronos
import pkg/ngtcp2
import ../../../udp/datagram
import ../../../udp/congestion
import ../connection
import ../path
import ../errors
import ../timestamp
proc trySend(connection: Ngtcp2Connection,
streamId: int64,
messagePtr: ptr byte,
messageLen: uint,
written: var int): Datagram =
var packetInfo: ngtcp2_pkt_info
let length = ngtcp2_conn_write_stream(
connection.conn,
connection.path.toPathPtr,
addr packetInfo,
addr connection.buffer[0],
connection.buffer.len.uint,
addr written,
0,
streamId,
messagePtr,
messageLen,
now()
)
checkResult length.cint
let data = connection.buffer[0..<length]
let ecn = ECN(packetInfo.ecn)
Datagram(data: data, ecn: ecn)
proc send*(connection: Ngtcp2Connection,
streamId: int64,
messagePtr: ptr byte,
messageLen: uint): Future[int] {.async.} =
var datagram = trySend(connection, streamId, messagePtr, messageLen, result)
while datagram.data.len == 0:
connection.flowing.clear()
await connection.flowing.wait()
datagram = trySend(connection, streamId, messagePtr, messageLen, result)
connection.onSend(datagram)
connection.updateTimeout()