diff --git a/examples/sctp_client.nim b/examples/sctp_client.nim deleted file mode 100644 index d7ed504..0000000 --- a/examples/sctp_client.nim +++ /dev/null @@ -1,14 +0,0 @@ -import chronos, stew/byteutils -import ../webrtc/sctp - -proc main() {.async.} = - let - sctp = Sctp.new(port = 4244) - address = TransportAddress(initTAddress("127.0.0.1:4242")) - conn = await sctp.connect(address, sctpPort = 13) - #let msg = await conn.read() - #echo string.fromBytes(msg) - await conn.write("toto".toBytes) - await conn.close() - -waitFor(main()) diff --git a/examples/sctp_ping.nim b/examples/sctp_ping.nim new file mode 100644 index 0000000..0194876 --- /dev/null +++ b/examples/sctp_ping.nim @@ -0,0 +1,13 @@ +import chronos, stew/byteutils +import ../webrtc/sctp + +proc main() {.async.} = + let sctp = Sctp.new(port = 4244) + let conn = await sctp.connect(initTAddress("127.0.0.1:4242"), sctpPort = 13) + while true: + await conn.write("ping".toBytes) + let msg = await conn.read() + echo "Received: ", string.fromBytes(msg) + await sleepAsync(1.seconds) + +waitFor(main()) diff --git a/examples/sctp_pong.nim b/examples/sctp_pong.nim new file mode 100644 index 0000000..de6a6a0 --- /dev/null +++ b/examples/sctp_pong.nim @@ -0,0 +1,19 @@ +import chronos, stew/byteutils +import ../webrtc/sctp + +proc sendPong(conn: SctpConnection) {.async.} = + var i = 0 + while true: + let msg = await conn.read() + echo "Received: ", string.fromBytes(msg) + await conn.write(("pong " & $i).toBytes) + i.inc() + +proc main() {.async.} = + let sctp = Sctp.new(port = 4242) + sctp.startServer(13) + while true: + let conn = await sctp.listen() + asyncSpawn conn.sendPong() + +waitFor(main()) diff --git a/examples/sctp_server.nim b/examples/sctp_server.nim deleted file mode 100644 index 65b2038..0000000 --- a/examples/sctp_server.nim +++ /dev/null @@ -1,14 +0,0 @@ -import chronos, stew/byteutils -import ../webrtc/sctp - -proc main() {.async.} = - let - sctp = Sctp.new(port = 4242, isServer = true, sctpPort = 13) - conn = await sctp.listen() - #await conn.write("toto".toBytes) - #await sleepAsync(3.seconds) - let msg = await conn.read() - echo "Receive: ", string.fromBytes(msg) - await conn.close() - -waitFor(main()) diff --git a/examples/sctp_st_client.nim b/examples/sctp_st_client.nim deleted file mode 100644 index 5077302..0000000 --- a/examples/sctp_st_client.nim +++ /dev/null @@ -1,78 +0,0 @@ -import bitops -import chronos, posix -import ../webrtc/usrsctp -import stew/ranges/ptr_arith - -proc printf(format: cstring) {.cdecl, varargs.} = echo "printf" -proc perror(error: cstring) {.importc, cdecl, header: "".} -const IPPROTO_SCTP = 132 -let ta = initTAddress("127.0.0.1:4244") -let tar = initTAddress("127.0.0.1:4242") - -proc connOutput(address: pointer, - buffer: pointer, - length: uint, - tos: uint8, - set_df: uint8): cint {.cdecl.} = - echo "====> connOutput: ", usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND) - let dg: ptr DatagramTransport = cast[ptr DatagramTransport](address) - proc testSend() {.async.} = - try: - let buf = @(buffer.makeOpenArray(byte, int(length))) - echo "START await sendTo START" - await sendTo(dg[], tar, buf, int(length)) - echo "STOP await sendTo STOP" - except CatchableError as exc: - echo "Failure: ", exc.msg - - asyncSpawn testSend() - echo "connOutput <====" - -var connected = false -proc handleUpcall(sock: ptr socket, arg: pointer, length: cint) {.cdecl.} = - let e = usrsctp_get_events(sock) - echo "handleUpcall: event = ", e - if bitor(e, SCTP_EVENT_WRITE) != 0 and not connected: - echo "connect" - connected = true - elif bitor(e, SCTP_EVENT_READ) != 0: - echo "recv" - else: - echo "/!\\ ERROR /!\\" - -proc handleEvents(dg: DatagramTransport, sock: ptr socket, sconn_addr: pointer) {.async.} = - await sleepAsync(3.seconds) - -proc main {.async, gcsafe.} = - let fut = newFuture[void]() - var p: pointer - proc clientMark(transp: DatagramTransport, raddr: TransportAddress): Future[void] {.async.} = - var msg = transp.getMessage() - echo "Client Mark: ", usrsctp_dumppacket(addr msg[0], uint(msg.len), SCTP_DUMP_INBOUND) - usrsctp_conninput(p, addr msg[0], uint(msg.len), 0) - - var dg = newDatagramTransport(clientMark, remote=tar, local=ta) - p = cast[pointer](addr dg) - usrsctp_init_nothreads(0, connOutput, printf) - discard usrsctp_sysctl_set_sctp_ecn_enable(1) - usrsctp_register_address(p) - let sock = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil) - var on: int = 1 - doAssert 0 == usrsctp_setsockopt(sock, IPPROTO_SCTP, SCTP_RECVRCVINFO, addr on, sizeof(on).SockLen) - doAssert 0 == usrsctp_set_non_blocking(sock, 1) - doAssert 0 == usrsctp_set_upcall(sock, handleUpcall, nil) - var sconn: Sockaddr_conn - sconn.sconn_family = AF_CONN - sconn.sconn_port = htons(0) - sconn.sconn_addr = nil - doAssert 0 == usrsctp_bind(sock, cast[ptr SockAddr](addr sconn), sizeof(sconn).SockLen) - sconn.sconn_family = AF_CONN - sconn.sconn_port = htons(13) - sconn.sconn_addr = p - let connErr = usrsctp_connect(sock, cast[ptr SockAddr](addr sconn), sizeof(sconn).SockLen) - perror("usrsctp_connect") - doAssert 0 == connErr or errno == EINPROGRESS, ($errno) - - await handleEvents(dg, sock, sconn.sconn_addr) - -waitFor(main()) diff --git a/examples/sctp_st_daytimeserver.nim b/examples/sctp_st_daytimeserver.nim deleted file mode 100644 index 36cfc1f..0000000 --- a/examples/sctp_st_daytimeserver.nim +++ /dev/null @@ -1,7 +0,0 @@ -import chronos -import ../webrtc/usrsctp - -proc main() {.async.} = - discard - -waitFor(main()) diff --git a/webrtc/sctp.nim b/webrtc/sctp.nim index 653e25d..d86833d 100644 --- a/webrtc/sctp.nim +++ b/webrtc/sctp.nim @@ -35,17 +35,13 @@ type dataRecv: seq[byte] Sctp* = ref object - running: bool udp: DatagramTransport connections: Table[TransportAddress, SctpConnection] gotConnection: AsyncEvent - timerHandler: Future[void] - case isServer: bool - of true: - sock: ptr socket - pendingConnections: seq[SctpConnection] - of false: - discard + timersHandler: Future[void] + isServer: bool + sockServer: ptr socket + pendingConnections: seq[SctpConnection] sentFuture: Future[void] sentConnection: SctpConnection sentAddress: TransportAddress @@ -69,7 +65,6 @@ template usrsctpAwait(sctp: Sctp, body: untyped): untyped = proc perror(error: cstring) {.importc, cdecl, header: "".} proc printf(format: cstring) {.cdecl, importc: "printf", varargs, header: "", gcsafe.} -proc `$`(p: pointer): string = "0x" & cast[uint](p).toHex() # TODO: Delete this proc packetPretty(packet: cstring): string = let data = $packet let ctn = data[23..^16] @@ -154,10 +149,10 @@ proc handleUpcall(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} = warn "Handle Upcall unexpected event", events proc handleAccept(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} = - trace "Handle Accept", data + trace "Handle Accept" let sctp = cast[Sctp](data) - sctpSocket = usrsctp_accept(sctp.sock, nil, nil) + sctpSocket = usrsctp_accept(sctp.sockServer, nil, nil) doAssert 0 == sctpSocket.usrsctp_set_non_blocking(1) let conn = SctpConnection.new(sctp, sctp.udp, sctp.sentAddress, sctpSocket) @@ -215,15 +210,47 @@ proc sendCallback(address: pointer, trace "Send To", address await sendTo(sctp.udp, address, buf, int(length)) except CatchableError as exc: - trace "Send Failed", address, message = exc.msg + trace "Send Failed", message = exc.msg sctp.sentFuture = testSend() -proc new*(T: typedesc[Sctp], - port: uint16 = 9899, - isServer: bool = false, - sctpPort: uint16 = 5000): T = +proc timersHandler() {.async.} = + while true: + await sleepAsync(500.milliseconds) + usrsctp_handle_timers(500) + +proc startServer*(self: Sctp, sctpPort: uint16 = 5000) = + if self.isServer: + trace "Try to start the server twice" + return + self.isServer = true + doAssert 0 == usrsctp_sysctl_set_sctp_blackhole(2) + doAssert 0 == usrsctp_sysctl_set_sctp_no_csum_on_loopback(0) + let sock = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil) + var on: int = 1 + doAssert 0 == usrsctp_set_non_blocking(sock, 1) + var sin: Sockaddr_in + sin.sin_family = posix.AF_INET.uint16 + sin.sin_port = htons(sctpPort) + sin.sin_addr.s_addr = htonl(INADDR_ANY) + doAssert 0 == usrsctp_bind(sock, cast[ptr SockAddr](addr sin), SockLen(sizeof(Sockaddr_in))) + doAssert 0 >= usrsctp_listen(sock, 1) + doAssert 0 == sock.usrsctp_set_upcall(handleAccept, cast[pointer](self)) + self.sockServer = sock + +proc closeServer(self: Sctp) = + if not self.isServer: + trace "Try to close a client" + return + self.isServer = false + let pcs = self.pendingConnections + self.pendingConnections = @[] + for pc in pcs: + pc.sctpSocket.usrsctp_close() + self.sockServer.usrsctp_close() + +proc new*(T: typedesc[Sctp], port: uint16 = 9899): T = logScope: topics = "webrtc sctp" - let sctp = T(gotConnection: newAsyncEvent(), isServer: isServer) + let sctp = T(gotConnection: newAsyncEvent()) proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async, gcsafe.} = let msg = udp.getMessage() @@ -240,7 +267,6 @@ proc new*(T: typedesc[Sctp], usrsctp_conninput(cast[pointer](sctp), addr msg[0], uint(msg.len), 0) else: let conn = await sctp.getOrCreateConnection(udp, address) - # TODO: Sctp Port? Read on the packet and get the port? I guess? sctp.sentConnection = conn sctp.sentAddress = address usrsctp_conninput(cast[pointer](sctp), addr msg[0], uint(msg.len), 0) @@ -250,33 +276,13 @@ proc new*(T: typedesc[Sctp], udp = newDatagramTransport(onReceive, local = laddr) trace "local address", localAddr, laddr sctp.udp = udp - sctp.timerHandler = (proc () {.async.} = - while true: - await sleepAsync(1.seconds) - usrsctp_handle_timers(1000))() # TODO: make it cleaner pls - if not isServer: - usrsctp_init_nothreads(0, sendCallback, printf) - discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_NONE) - discard usrsctp_sysctl_set_sctp_ecn_enable(1) - usrsctp_register_address(cast[pointer](sctp)) - else: - usrsctp_init_nothreads(port, sendCallback, printf) - discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_NONE) - doAssert 0 == usrsctp_sysctl_set_sctp_blackhole(2) - doAssert 0 == usrsctp_sysctl_set_sctp_no_csum_on_loopback(0) - usrsctp_register_address(cast[pointer](sctp)) - let sock = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil) - var on: int = 1 - doAssert 0 == usrsctp_set_non_blocking(sock, 1) - var sin: Sockaddr_in - sin.sin_family = AF_INET.uint16 - sin.sin_port = htons(sctpPort) - sin.sin_addr.s_addr = htonl(INADDR_ANY) - doAssert 0 == usrsctp_bind(sock, cast[ptr SockAddr](addr sin), SockLen(sizeof(Sockaddr_in))) - doAssert 0 >= usrsctp_listen(sock, 1) - doAssert 0 == sock.usrsctp_set_upcall(handleAccept, cast[pointer](sctp)) - sctp.sock = sock - sctp.running = true + sctp.timersHandler = timersHandler() + + usrsctp_init_nothreads(port, sendCallback, printf) + discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_NONE) + discard usrsctp_sysctl_set_sctp_ecn_enable(1) + usrsctp_register_address(cast[pointer](sctp)) + return sctp proc listen*(self: Sctp): Future[SctpConnection] {.async.} = @@ -290,10 +296,16 @@ proc listen*(self: Sctp): Future[SctpConnection] {.async.} = self.pendingConnections.delete(0) return res -proc connect*(self: Sctp, address: TransportAddress, sctpPort: uint16 = 5000): Future[SctpConnection] {.async.} = +proc connect*(self: Sctp, + address: TransportAddress, + sctpPort: uint16 = 5000): Future[SctpConnection] {.async.} = trace "Connect", address let conn = await self.getOrCreateConnection(self.udp, address, sctpPort) - await conn.connectEvent.wait() + try: + await conn.connectEvent.wait() + except CancelledError as exc: + conn.sctpSocket.usrsctp_close() + return nil if conn.state != Connected: raise newSctpError("Cannot connect to " & $address) return conn