diff --git a/webrtc/stun/stun_connection.nim b/webrtc/stun/stun_connection.nim index 823d410..a00f48b 100644 --- a/webrtc/stun/stun_connection.nim +++ b/webrtc/stun/stun_connection.nim @@ -13,40 +13,33 @@ import ../udp_connection, stun type StunConn* = ref object conn: UdpConn - address: TransportAddress - recvData: seq[(seq[byte], TransportAddress)] - recvEvent: AsyncEvent + laddr: TransportAddress + dataRecv: AsyncQueue[(seq[byte], TransportAddress)] handlesFut: Future[void] proc handles(self: StunConn) {.async.} = while true: # TODO: while not self.conn.atEof() let (msg, address) = await self.conn.read() if Stun.isMessage(msg): - let res = Stun.getResponse(msg, self.address) + let res = Stun.getResponse(msg, self.laddr) if res.isSome(): await self.conn.write(res.get()) else: - self.recvData.add((msg, address)) - self.recvEvent.fire() + self.dataRecv.addLastNoWait((msg, address)) -method init(self: StunConn, conn: UdpConn, address: TransportAddress) {.async.} = +proc init(self: StunConn, conn: UdpConn, laddr: TransportAddress) {.async.} = self.conn = conn - self.address = address + self.laddr = laddr - self.recvEvent = newAsyncEvent() + self.dataRecv = newAsyncQueue() self.handlesFut = handles() -method close(self: StunConn) {.async.} = +proc close(self: StunConn) {.async.} = self.handlesFut.cancel() # check before? self.conn.close() -method write(self: StunConn, msg: seq[byte]) {.async.} = +proc write(self: StunConn, msg: seq[byte]) {.async.} = await self.conn.write(msg) -method read(self: StunConn): Future[(seq[byte], TransportAddress)] {.async.} = - while self.recvData.len() <= 0: - self.recvEvent.clear() - await self.recvEvent.wait() - let res = self.recvData[0] - self.recvData.delete(0..0) - return res +proc read(self: StunConn): Future[(seq[byte], TransportAddress)] {.async.} = + return await self.dataRecv.popFirst() diff --git a/webrtc/udp_connection.nim b/webrtc/udp_connection.nim index 53146dd..d612c10 100644 --- a/webrtc/udp_connection.nim +++ b/webrtc/udp_connection.nim @@ -16,21 +16,19 @@ logScope: type UdpConn* = ref object - localAddress: TransportAddress + laddr: TransportAddress udp: DatagramTransport - recvData: seq[(seq[byte], TransportAddress)] - recvEvent: AsyncEvent + dataRecv: AsyncQueue[(seq[byte], TransportAddress)] proc init(self: UdpConn, laddr: TransportAddress) {.async.} = - self.localAddress = laddr + self.laddr = laddr proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async, gcsafe.} = let msg = udp.getMessage() echo "\e[33m\e[0;1m onReceive\e[0m: ", msg.len() - self.recvData.add((msg, address)) - self.recvEvent.fire() + self.dataRecv.addLastNoWait((msg, address)) - self.recvEvent = newAsyncEvent() + self.dataRecv = newAsyncQueue() self.udp = newDatagramTransport(onReceive, local = laddr) proc close(self: UdpConn) {.async.} = @@ -44,8 +42,4 @@ proc write(self: UdpConn, msg: seq[byte]) {.async.} = proc read(self: UdpConn): Future[(seq[byte], TransportAddress)] {.async.} = echo "\e[33m\e[0;1m read\e[0m" - while self.recvData.len() <= 0: - self.recvEvent.clear() - await self.recvEvent.wait() - result = self.recvData[0] - self.recvData.delete(0..0) + return await self.dataRecv.popFirst()