Write long messages to streams
This commit is contained in:
parent
edaffcfcf6
commit
7265d6ef70
|
@ -8,7 +8,7 @@ import ngtcp2/streams
|
|||
export Connection
|
||||
export newClientConnection
|
||||
export newServerConnection
|
||||
export receive, send
|
||||
export receive, send, sendLoop
|
||||
export isHandshakeCompleted
|
||||
export handshake
|
||||
export Stream
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
proc `+`*[T](p: ptr T, a: int): ptr T =
|
||||
cast[ptr T](cast[ByteAddress](p) + ByteAddress(a))
|
|
@ -7,6 +7,7 @@ import ../congestion
|
|||
import connection
|
||||
import errors
|
||||
import path
|
||||
import pointers
|
||||
|
||||
type Stream* = object
|
||||
id: int64
|
||||
|
@ -19,7 +20,7 @@ proc openStream*(connection: Connection): Stream =
|
|||
proc close*(stream: Stream) =
|
||||
checkResult ngtcp2_conn_shutdown_stream(stream.connection.conn, stream.id, 0)
|
||||
|
||||
proc write*(stream: Stream, message: seq[byte]) {.async.} =
|
||||
proc trySend(stream: Stream, messagePtr: ptr byte, messageLen: uint, written: var int): Datagram =
|
||||
var packetInfo: ngtcp2_pkt_info
|
||||
let length = ngtcp2_conn_write_stream(
|
||||
stream.connection.conn,
|
||||
|
@ -27,19 +28,36 @@ proc write*(stream: Stream, message: seq[byte]) {.async.} =
|
|||
addr packetInfo,
|
||||
addr stream.connection.buffer[0],
|
||||
stream.connection.buffer.len.uint,
|
||||
nil,
|
||||
addr written,
|
||||
0,
|
||||
stream.id,
|
||||
message.toUnsafePtr,
|
||||
message.len.uint,
|
||||
messagePtr,
|
||||
messageLen,
|
||||
getMonoTime().ticks.uint
|
||||
)
|
||||
checkResult length.cint
|
||||
let data = stream.connection.buffer[0..<length]
|
||||
let ecn = ECN(packetInfo.ecn)
|
||||
let datagram = Datagram(data: data, ecn: ecn)
|
||||
Datagram(data: data, ecn: ecn)
|
||||
|
||||
proc send(stream: Stream, messagePtr: ptr byte, messageLen: uint): Future[int] {.async.} =
|
||||
var datagram = stream.trySend(messagePtr, messageLen, result)
|
||||
while datagram.data.len == 0:
|
||||
stream.connection.flowing.clear()
|
||||
await stream.connection.flowing.wait()
|
||||
datagram = stream.trySend(messagePtr, messageLen, result)
|
||||
await stream.connection.outgoing.put(datagram)
|
||||
|
||||
proc write*(stream: Stream, message: seq[byte]) {.async.} =
|
||||
var messagePtr = message.toUnsafePtr
|
||||
var messageLen = message.len.uint
|
||||
var done = false
|
||||
while not done:
|
||||
let written = await stream.send(messagePtr, messageLen)
|
||||
messagePtr = messagePtr + written
|
||||
messageLen = messageLen - written.uint
|
||||
done = messageLen == 0
|
||||
|
||||
proc receiveStreamData*(connection: ptr ngtcp2_conn, flags: uint32, stream_id: int64, offset: uint64, data: ptr uint8, datalen: uint, user_data: pointer, stream_user_data: pointer): cint{.cdecl.} =
|
||||
checkResult connection.ngtcp2_conn_extend_max_stream_offset(stream_id, datalen)
|
||||
connection.ngtcp2_conn_extend_max_offset(datalen)
|
||||
|
|
|
@ -35,6 +35,10 @@ proc send*(connection: Connection) {.async.} =
|
|||
datagram = connection.trySend()
|
||||
await connection.outgoing.put(datagram)
|
||||
|
||||
proc sendLoop*(connection: Connection) {.async.} =
|
||||
while true:
|
||||
await connection.send()
|
||||
|
||||
proc receive*(connection: Connection, datagram: DatagramBuffer, ecn = ecnNonCapable) =
|
||||
var packetInfo: ngtcp2_pkt_info
|
||||
packetInfo.ecn = ecn.uint32
|
||||
|
|
|
@ -2,10 +2,22 @@ import chronos
|
|||
import quic
|
||||
import addresses
|
||||
|
||||
proc networkLoop(source, destination: Connection) {.async.} =
|
||||
type Counter* = ref object
|
||||
count*: int
|
||||
|
||||
proc networkLoop*(source, destination: Connection, counter = Counter()) {.async.} =
|
||||
while true:
|
||||
let datagram = await source.outgoing.get()
|
||||
destination.receive(datagram)
|
||||
inc counter.count
|
||||
|
||||
proc simulateNetwork*(a, b: Connection, messageCounter = Counter()) {.async.} =
|
||||
await allFutures(
|
||||
networkLoop(a, b, messageCounter),
|
||||
networkLoop(b, a, messageCounter),
|
||||
sendLoop(a),
|
||||
sendLoop(b)
|
||||
)
|
||||
|
||||
proc performHandshake*: Future[tuple[client, server: Connection]] {.async.} =
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import std/unittest
|
||||
import std/sequtils
|
||||
import chronos
|
||||
import quic
|
||||
import ../helpers/asynctest
|
||||
|
@ -46,3 +47,15 @@ suite "streams":
|
|||
|
||||
expect IOError:
|
||||
await stream.write(@[1'u8, 2'u8, 3'u8])
|
||||
|
||||
asynctest "writes long messages to stream":
|
||||
let messageCounter = Counter()
|
||||
let (client, server) = await performHandshake()
|
||||
let simulation = simulateNetwork(client, server, messageCounter)
|
||||
defer: simulation.cancel()
|
||||
|
||||
let stream = client.openStream()
|
||||
let message = repeat(42'u8, 100 * sizeof(client.buffer))
|
||||
await stream.write(message)
|
||||
|
||||
check messageCounter.count > 100
|
||||
|
|
Loading…
Reference in New Issue