From aa547badfdabe3deac7970fd9a2a6038b8f0d6ce Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Thu, 3 Nov 2022 17:27:01 +0100 Subject: [PATCH] Handle accept / write & fix multiple errors --- build.sh | 3 + examples/sctp_st_client.nim | 2 - webrtc.nimble | 10 ++- webrtc/sctp.nim | 148 ++++++++++++++++++++++++++++-------- 4 files changed, 128 insertions(+), 35 deletions(-) diff --git a/build.sh b/build.sh index 35d0916..0ce73f2 100755 --- a/build.sh +++ b/build.sh @@ -18,6 +18,9 @@ for file in `find ${root}/usrsctp/usrsctplib -name '*.c'`; do compile="${compile} --compile=${file}" done +# TODO: Make something more reliable, aka remove this LIBCFLAGS +# and put the different flags on prelude.nim depending on the +# OS we're currently on LIBCFLAGS="$(grep "^LIBCFLAGS = " "${root}/usrsctp/Makefile" | cut -d' ' -f3- | sed 's/-D/--defines=/g')" # generate nim wrapper with nimterop diff --git a/examples/sctp_st_client.nim b/examples/sctp_st_client.nim index e7a3a15..161216c 100644 --- a/examples/sctp_st_client.nim +++ b/examples/sctp_st_client.nim @@ -7,8 +7,6 @@ const IPPROTO_SCTP = 132 let ta = initTAddress("127.0.0.1:4244") let tar = initTAddress("127.0.0.1:4242") -proc discardFunc(transp: DatagramTransport, raddr: TransportAddress): Future[void] {.async.} = discard - proc connOutput(address: pointer, buffer: pointer, length: uint, diff --git a/webrtc.nimble b/webrtc.nimble index 008f609..b6d70e0 100644 --- a/webrtc.nimble +++ b/webrtc.nimble @@ -1,6 +1,10 @@ packageName = "webrtc" version = "0.0.1" author = "Status Research & Development GmbH" -description = "" -licence = "" -installDirs = @["usrsctp"] +description = "Webrtc stack" +license = "MIT" +#installDirs = @["usrsctp"] + +requires "nim >= 1.2.0", + "chronicles >= 0.10.2", + "chronos >= 3.0.6" diff --git a/webrtc/sctp.nim b/webrtc/sctp.nim index a194c67..bff4566 100644 --- a/webrtc/sctp.nim +++ b/webrtc/sctp.nim @@ -7,58 +7,146 @@ # This file may not be copied, modified, or distributed except according to # those terms. -import tables, bitops, sequtils -import chronos +import tables, bitops, posix +import chronos, chronicles, stew/ranges/ptr_arith import usrsctp +logScope: + topics = "webrtc sctp" + type SctpConnection* = ref object + sctp: Sctp udp: DatagramTransport address: TransportAddress - - SctpListener* = ref object - udp: DatagramTransport - connections: Table[TransportAddress, SctpConnection] + sctpSocket: ptr socket + dataRecv: seq[byte] Sctp* = ref object - udps: seq[DatagramTransport] + udp: DatagramTransport + connections: Table[TransportAddress, SctpConnection] + gotConnection: AsyncEvent + case isServer: bool + of true: + sock: ptr socket + of false: + discard + sentFuture: Future[void] + sentConnection: SctpConnection + +const + IPPROTO_SCTP = 132 proc new(T: typedesc[SctpConnection], + sctp: Sctp, udp: DatagramTransport, - address: TransportAddress): T = - T(udp: udp, address: address) + address: TransportAddress, + sctpSocket: ptr socket): T = + T(sctp: sctp, udp: udp, address: address, sctpSocket: sctpSocket) proc read(self: SctpConnection): Future[seq[byte]] = discard -proc write(self: SctpConnection, buf: seq[byte]) {.async.} = discard + +proc write(self: SctpConnection, buf: seq[byte]) {.async.} = + self.sctp.sentConnection = self + discard self.sctpSocket.usrsctp_sendv(addr buf, buf.len.uint, nil, 0, nil, 0, SCTP_SENDV_NOINFO, 0) + await self.sctp.sentFuture + proc close(self: SctpConnection) {.async.} = discard -proc getOrCreateConnection(self: SctpListener, +proc handleUpcall(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} = + let e = usrsctp_get_events(sock) + echo "handleUpcall: event = ", e + if bitor(e, SCTP_EVENT_WRITE) != 0: # and not connected: + echo "connect" + #connected = true + elif bitor(e, SCTP_EVENT_READ) != 0: + echo "recv" + else: + echo "/!\\ ERROR /!\\" + +proc handleAccept(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} = + var + sin: Sockaddr_in + size: SockLen + ipaddress: IpAddress + port: Port + let + sctp: Sctp = cast[ptr Sctp](data)[] + sctpSocket = usrsctp_accept(sctp.sock, + cast[ptr SockAddr](addr sin), + cast[ptr SockLen](addr size)) + sin.fromSockAddr(sizeof(sin).SockLen, ipaddress, port) + let address = initTAddress(ipaddress, port) + sctp.connections[address] = SctpConnection.new(sctp, sctp.udp, address, sctpSocket) + +proc getOrCreateConnection(self: Sctp, udp: DatagramTransport, address: TransportAddress): SctpConnection = if self.connections.hasKey(address): return self.connections[address] - let connection = SctpConnection.new(udp, address) - self.connections[address] = connection - return connection + let + sctpSocket = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil) + conn = SctpConnection.new(self, udp, address, sctpSocket) + var on: int = 1 + doAssert 0 == usrsctp_setsockopt(conn.sctpSocket, + IPPROTO_SCTP, + SCTP_RECVRCVINFO, + addr on, + sizeof(on).SockLen) + doAssert 0 == usrsctp_set_non_blocking(conn.sctpSocket, 1) + doAssert 0 == usrsctp_set_upcall(conn.sctpSocket, handleUpcall, nil) + self.connections[address] = conn + self.gotConnection.fire() + return conn -proc new(T: typedesc[SctpListener], address: TransportAddress): T = - let listener = T() +proc sendCallback(address: pointer, + buffer: pointer, + length: uint, + tos: uint8, + set_df: uint8): cint {.cdecl.} = + trace "sendCallback", data = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND) + let sctp: ptr Sctp = cast[ptr Sctp](address) + proc testSend() {.async.} = + try: + let buf = @(buffer.makeOpenArray(byte, int(length))) + await sendTo(sctp[].udp, sctp.sentConnection.address, buf, int(length)) + except CatchableError as exc: + echo "Failure: ", exc.msg + sctp.sentFuture = testSend() + +proc new(T: typedesc[Sctp], port: uint16 = 9899, isServer: bool = true): T = + let + sctp = T(gotConnection: newAsyncEvent()) + sctpPtr = cast[pointer](addr sctp) proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async.} = - let connection = listener.getOrCreateConnection(udp, address) - connection.receive(udp.getMessage()) - let udp = newDatagramTransport(onReceive, local = address) - listener.udp = udp - listener + let + msg = udp.getMessage() + conn = sctp.getOrCreateConnection(udp, address) + connPtr = cast[pointer](addr conn) + usrsctp_conninput(connPtr, addr msg[0], uint(msg.len), 0) + let + localAddr = TransportAddress(family: AddressFamily.IPv4, port: Port(port)) + udp = newDatagramTransport(onReceive, local = localAddr) + usrsctp_init_nothreads(port, sendCallback, nil) # TODO maybe put a debugger instead of nil + usrsctp_register_address(sctpPtr) + sctp.udp = udp + if isServer: + let sock = usrsctp_socket(AF_INET, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil) + sock.usrsctp_set_upcall(handleAccept, sctpPtr) + var sconn: Sockaddr_conn + sconn.sconn_family = AF_CONN + sconn.sconn_port = htons(0) + sconn.sconn_addr = nil + doAssert 0 == usrsctp_bind(sock, cast[ptr SockAddr](addr sconn), sizeof(sconn).SockLen) + doAssert 0 < usrsctp_listen(sock, 1) + return sctp -proc new(T: typedesc[Sctp]): T = - T() - -proc listen(self: Sctp, address: TransportAddress): SctpListener = - # what should happen when adding multiple time the same address - proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async.} = - discard - let udp = newDatagramTransport(onReceive, local = address) - self.udps.add(udp) +proc listen(self: Sctp, address: TransportAddress): Future[SctpConnection] {.async.} = + while true: + if self.connections.hasKey(address): + return self.connections[address] + self.gotConnection.clear() + await self.gotConnection.wait() proc connect(self: Sctp): Future[SctpConnection] = discard proc dial(self: Sctp, address: TransportAddress): Future[SctpConnection] = discard