add asyncqueue & minor fixes

This commit is contained in:
Ludovic Chenut 2023-10-06 17:56:15 +02:00
parent d4b5b420a2
commit 397c84238a
No known key found for this signature in database
GPG Key ID: D9A59B1907F1D50C
2 changed files with 17 additions and 30 deletions

View File

@ -13,40 +13,33 @@ import ../udp_connection, stun
type type
StunConn* = ref object StunConn* = ref object
conn: UdpConn conn: UdpConn
address: TransportAddress laddr: TransportAddress
recvData: seq[(seq[byte], TransportAddress)] dataRecv: AsyncQueue[(seq[byte], TransportAddress)]
recvEvent: AsyncEvent
handlesFut: Future[void] handlesFut: Future[void]
proc handles(self: StunConn) {.async.} = proc handles(self: StunConn) {.async.} =
while true: # TODO: while not self.conn.atEof() while true: # TODO: while not self.conn.atEof()
let (msg, address) = await self.conn.read() let (msg, address) = await self.conn.read()
if Stun.isMessage(msg): if Stun.isMessage(msg):
let res = Stun.getResponse(msg, self.address) let res = Stun.getResponse(msg, self.laddr)
if res.isSome(): if res.isSome():
await self.conn.write(res.get()) await self.conn.write(res.get())
else: else:
self.recvData.add((msg, address)) self.dataRecv.addLastNoWait((msg, address))
self.recvEvent.fire()
method init(self: StunConn, conn: UdpConn, address: TransportAddress) {.async.} = proc init(self: StunConn, conn: UdpConn, laddr: TransportAddress) {.async.} =
self.conn = conn self.conn = conn
self.address = address self.laddr = laddr
self.recvEvent = newAsyncEvent() self.dataRecv = newAsyncQueue()
self.handlesFut = handles() self.handlesFut = handles()
method close(self: StunConn) {.async.} = proc close(self: StunConn) {.async.} =
self.handlesFut.cancel() # check before? self.handlesFut.cancel() # check before?
self.conn.close() self.conn.close()
method write(self: StunConn, msg: seq[byte]) {.async.} = proc write(self: StunConn, msg: seq[byte]) {.async.} =
await self.conn.write(msg) await self.conn.write(msg)
method read(self: StunConn): Future[(seq[byte], TransportAddress)] {.async.} = proc read(self: StunConn): Future[(seq[byte], TransportAddress)] {.async.} =
while self.recvData.len() <= 0: return await self.dataRecv.popFirst()
self.recvEvent.clear()
await self.recvEvent.wait()
let res = self.recvData[0]
self.recvData.delete(0..0)
return res

View File

@ -16,21 +16,19 @@ logScope:
type type
UdpConn* = ref object UdpConn* = ref object
localAddress: TransportAddress laddr: TransportAddress
udp: DatagramTransport udp: DatagramTransport
recvData: seq[(seq[byte], TransportAddress)] dataRecv: AsyncQueue[(seq[byte], TransportAddress)]
recvEvent: AsyncEvent
proc init(self: UdpConn, laddr: TransportAddress) {.async.} = proc init(self: UdpConn, laddr: TransportAddress) {.async.} =
self.localAddress = laddr self.laddr = laddr
proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async, gcsafe.} = proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async, gcsafe.} =
let msg = udp.getMessage() let msg = udp.getMessage()
echo "\e[33m<UDP>\e[0;1m onReceive\e[0m: ", msg.len() echo "\e[33m<UDP>\e[0;1m onReceive\e[0m: ", msg.len()
self.recvData.add((msg, address)) self.dataRecv.addLastNoWait((msg, address))
self.recvEvent.fire()
self.recvEvent = newAsyncEvent() self.dataRecv = newAsyncQueue()
self.udp = newDatagramTransport(onReceive, local = laddr) self.udp = newDatagramTransport(onReceive, local = laddr)
proc close(self: UdpConn) {.async.} = proc close(self: UdpConn) {.async.} =
@ -44,8 +42,4 @@ proc write(self: UdpConn, msg: seq[byte]) {.async.} =
proc read(self: UdpConn): Future[(seq[byte], TransportAddress)] {.async.} = proc read(self: UdpConn): Future[(seq[byte], TransportAddress)] {.async.} =
echo "\e[33m<UDP>\e[0;1m read\e[0m" echo "\e[33m<UDP>\e[0;1m read\e[0m"
while self.recvData.len() <= 0: return await self.dataRecv.popFirst()
self.recvEvent.clear()
await self.recvEvent.wait()
result = self.recvData[0]
self.recvData.delete(0..0)