From 472c1923ca5e3c84b17c33f636ee3806876d93b8 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Fri, 4 Nov 2022 17:28:46 +0100 Subject: [PATCH] Beginning of client/server example --- examples/sctp_client.nim | 11 ++++++ examples/sctp_server.nim | 12 ++++++ examples/sctp_st_client.nim | 77 ------------------------------------- webrtc/sctp.nim | 39 +++++++++++++------ 4 files changed, 51 insertions(+), 88 deletions(-) create mode 100644 examples/sctp_client.nim create mode 100644 examples/sctp_server.nim delete mode 100644 examples/sctp_st_client.nim diff --git a/examples/sctp_client.nim b/examples/sctp_client.nim new file mode 100644 index 0000000..9f030d8 --- /dev/null +++ b/examples/sctp_client.nim @@ -0,0 +1,11 @@ +import chronos, stew/byteutils +import ../webrtc/sctp + +proc main() {.async.} = + let + sctp = Sctp.new(port = 4242) + address = TransportAddress(initTAddress("127.0.0.1:9899")) + conn = await sctp.connect(address) + await conn.write("toto".toBytes) + +waitFor(main()) diff --git a/examples/sctp_server.nim b/examples/sctp_server.nim new file mode 100644 index 0000000..2210c53 --- /dev/null +++ b/examples/sctp_server.nim @@ -0,0 +1,12 @@ +import chronos, stew/byteutils +import ../webrtc/sctp + +proc main() {.async.} = + let + sctp = Sctp.new(isServer = true) + address = initTAddress("127.0.0.1:9899") + conn = await sctp.listen(address) + let msg = await conn.read() + echo string.fromBytes(msg) + +waitFor(main()) diff --git a/examples/sctp_st_client.nim b/examples/sctp_st_client.nim deleted file mode 100644 index 161216c..0000000 --- a/examples/sctp_st_client.nim +++ /dev/null @@ -1,77 +0,0 @@ -import bitops -import chronos, posix -import ../webrtc/usrsctp -import stew/ranges/ptr_arith - -const IPPROTO_SCTP = 132 -let ta = initTAddress("127.0.0.1:4244") -let tar = initTAddress("127.0.0.1:4242") - -proc connOutput(address: pointer, - buffer: pointer, - length: uint, - tos: uint8, - set_df: uint8): cint {.cdecl.} = - echo "====> connOutput: ", usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND) - let dg: ptr DatagramTransport = cast[ptr DatagramTransport](address) - proc testSend() {.async.} = - try: - let buf = @(buffer.makeOpenArray(byte, int(length))) - echo "START await sendTo START" - await sendTo(dg[], tar, buf, int(length)) - echo "STOP await sendTo STOP" - except CatchableError as exc: - echo "Failure: ", exc.msg - - asyncSpawn testSend() - echo "connOutput <====" - -var connected = false -proc handleUpcall(sock: ptr socket, arg: pointer, length: cint) {.cdecl.} = - let e = usrsctp_get_events(sock) - echo "handleUpcall: event = ", e - if bitor(e, SCTP_EVENT_WRITE) != 0 and not connected: - echo "connect" - connected = true - elif bitor(e, SCTP_EVENT_READ) != 0: - echo "recv" - else: - echo "/!\\ ERROR /!\\" - -proc printf(format: cstring) {.cdecl, varargs.} = echo "printf" - -proc handleEvents(dg: DatagramTransport, sock: ptr socket, sconn_addr: pointer) {.async.} = - await sleepAsync(3.seconds) - -proc main {.async, gcsafe.} = - let fut = newFuture[void]() - var p: pointer - proc clientMark(transp: DatagramTransport, raddr: TransportAddress): Future[void] {.async.} = - var msg = transp.getMessage() - echo "Client Mark: ", usrsctp_dumppacket(addr msg[0], uint(msg.len), SCTP_DUMP_INBOUND) - usrsctp_conninput(p, addr msg[0], uint(msg.len), 0) - - var dg = newDatagramTransport(clientMark, remote=tar, local=ta) - p = cast[pointer](addr dg) - usrsctp_init_nothreads(0, connOutput, printf) - discard usrsctp_sysctl_set_sctp_ecn_enable(1) - usrsctp_register_address(p) - let sock = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil) - var on: int = 1 - doAssert 0 == usrsctp_setsockopt(sock, IPPROTO_SCTP, SCTP_RECVRCVINFO, addr on, sizeof(on).SockLen) - doAssert 0 == usrsctp_set_non_blocking(sock, 1) - doAssert 0 == usrsctp_set_upcall(sock, handleUpcall, nil) - var sconn: Sockaddr_conn - sconn.sconn_family = AF_CONN - sconn.sconn_port = htons(0) - sconn.sconn_addr = nil - doAssert 0 == usrsctp_bind(sock, cast[ptr SockAddr](addr sconn), sizeof(sconn).SockLen) - sconn.sconn_family = AF_CONN - sconn.sconn_port = htons(13) - sconn.sconn_addr = p - let connErr = usrsctp_connect(sock, cast[ptr SockAddr](addr sconn), sizeof(sconn).SockLen) - doAssert 0 == connErr or errno == EINPROGRESS, ($errno) - - await handleEvents(dg, sock, sconn.sconn_addr) - -waitFor(main()) diff --git a/webrtc/sctp.nim b/webrtc/sctp.nim index bff4566..c4aa5f8 100644 --- a/webrtc/sctp.nim +++ b/webrtc/sctp.nim @@ -37,6 +37,8 @@ type const IPPROTO_SCTP = 132 +proc perror(error: cstring) {.importc, cdecl, header: "".} + proc new(T: typedesc[SctpConnection], sctp: Sctp, udp: DatagramTransport, @@ -44,14 +46,14 @@ proc new(T: typedesc[SctpConnection], sctpSocket: ptr socket): T = T(sctp: sctp, udp: udp, address: address, sctpSocket: sctpSocket) -proc read(self: SctpConnection): Future[seq[byte]] = discard +proc read*(self: SctpConnection): Future[seq[byte]] = discard -proc write(self: SctpConnection, buf: seq[byte]) {.async.} = +proc write*(self: SctpConnection, buf: seq[byte]) {.async.} = self.sctp.sentConnection = self discard self.sctpSocket.usrsctp_sendv(addr buf, buf.len.uint, nil, 0, nil, 0, SCTP_SENDV_NOINFO, 0) await self.sctp.sentFuture -proc close(self: SctpConnection) {.async.} = discard +proc close*(self: SctpConnection) {.async.} = discard proc handleUpcall(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} = let e = usrsctp_get_events(sock) @@ -78,6 +80,7 @@ proc handleAccept(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} = sin.fromSockAddr(sizeof(sin).SockLen, ipaddress, port) let address = initTAddress(ipaddress, port) sctp.connections[address] = SctpConnection.new(sctp, sctp.udp, address, sctpSocket) + sctp.gotConnection.fire() proc getOrCreateConnection(self: Sctp, udp: DatagramTransport, @@ -95,8 +98,12 @@ proc getOrCreateConnection(self: Sctp, sizeof(on).SockLen) doAssert 0 == usrsctp_set_non_blocking(conn.sctpSocket, 1) doAssert 0 == usrsctp_set_upcall(conn.sctpSocket, handleUpcall, nil) + var sconn: Sockaddr_conn + sconn.sconn_family = AF_CONN + sconn.sconn_port = htons(5000) + sconn.sconn_addr = nil + doAssert 0 == conn.sctpSocket.usrsctp_connect(cast[ptr SockAddr](addr sconn), SockLen(sizeof(sconn))) self.connections[address] = conn - self.gotConnection.fire() return conn proc sendCallback(address: pointer, @@ -114,7 +121,7 @@ proc sendCallback(address: pointer, echo "Failure: ", exc.msg sctp.sentFuture = testSend() -proc new(T: typedesc[Sctp], port: uint16 = 9899, isServer: bool = true): T = +proc new*(T: typedesc[Sctp], port: uint16 = 9899, isServer: bool = false): T = let sctp = T(gotConnection: newAsyncEvent()) sctpPtr = cast[pointer](addr sctp) @@ -128,25 +135,35 @@ proc new(T: typedesc[Sctp], port: uint16 = 9899, isServer: bool = true): T = localAddr = TransportAddress(family: AddressFamily.IPv4, port: Port(port)) udp = newDatagramTransport(onReceive, local = localAddr) usrsctp_init_nothreads(port, sendCallback, nil) # TODO maybe put a debugger instead of nil + discard usrsctp_sysctl_set_sctp_ecn_enable(1) usrsctp_register_address(sctpPtr) sctp.udp = udp if isServer: + echo errno, " <=" let sock = usrsctp_socket(AF_INET, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil) - sock.usrsctp_set_upcall(handleAccept, sctpPtr) + perror("usrsctp_socket") + var on: int = 1 + echo "=> ", errno + doAssert 0 == usrsctp_setsockopt(sock, IPPROTO_SCTP, SCTP_RECVRCVINFO, addr on, SockLen(sizeof(on))) + doAssert 0 == usrsctp_set_non_blocking(sock, 1) + echo sock.usrsctp_set_upcall(handleAccept, sctpPtr) + echo errno var sconn: Sockaddr_conn sconn.sconn_family = AF_CONN - sconn.sconn_port = htons(0) + sconn.sconn_port = htons(5000) sconn.sconn_addr = nil - doAssert 0 == usrsctp_bind(sock, cast[ptr SockAddr](addr sconn), sizeof(sconn).SockLen) + echo usrsctp_bind(sock, cast[ptr SockAddr](addr sconn), SockLen(sizeof(sconn))) doAssert 0 < usrsctp_listen(sock, 1) + sctp.sock = sock return sctp -proc listen(self: Sctp, address: TransportAddress): Future[SctpConnection] {.async.} = +proc listen*(self: Sctp, address: TransportAddress): Future[SctpConnection] {.async.} = while true: if self.connections.hasKey(address): return self.connections[address] self.gotConnection.clear() await self.gotConnection.wait() -proc connect(self: Sctp): Future[SctpConnection] = discard -proc dial(self: Sctp, address: TransportAddress): Future[SctpConnection] = discard +proc connect*(self: Sctp, address: TransportAddress): Future[SctpConnection] {.async.} = + let conn = self.getOrCreateConnection(self.udp, address) + return conn