diff --git a/examples/sctp_client.nim b/examples/sctp_client.nim index 6361896..d7ed504 100644 --- a/examples/sctp_client.nim +++ b/examples/sctp_client.nim @@ -6,7 +6,9 @@ proc main() {.async.} = sctp = Sctp.new(port = 4244) address = TransportAddress(initTAddress("127.0.0.1:4242")) conn = await sctp.connect(address, sctpPort = 13) + #let msg = await conn.read() + #echo string.fromBytes(msg) await conn.write("toto".toBytes) - await sleepAsync(3.seconds) + await conn.close() waitFor(main()) diff --git a/examples/sctp_server.nim b/examples/sctp_server.nim index 42d264e..65b2038 100644 --- a/examples/sctp_server.nim +++ b/examples/sctp_server.nim @@ -5,7 +5,10 @@ proc main() {.async.} = let sctp = Sctp.new(port = 4242, isServer = true, sctpPort = 13) conn = await sctp.listen() + #await conn.write("toto".toBytes) + #await sleepAsync(3.seconds) let msg = await conn.read() - echo string.fromBytes(msg) + echo "Receive: ", string.fromBytes(msg) + await conn.close() waitFor(main()) diff --git a/webrtc/sctp.nim b/webrtc/sctp.nim index 6c3cb32..653e25d 100644 --- a/webrtc/sctp.nim +++ b/webrtc/sctp.nim @@ -7,7 +7,7 @@ # This file may not be copied, modified, or distributed except according to # those terms. -import tables, bitops, posix, strutils +import tables, bitops, posix, strutils, sequtils import chronos, chronicles, stew/ranges/ptr_arith import usrsctp @@ -17,13 +17,21 @@ logScope: topics = "webrtc sctp" type + SctpError* = object of CatchableError + + SctpState = enum + Connecting + Connected + Closed + SctpConnection* = ref object - connected: bool + state: SctpState connectEvent: AsyncEvent sctp: Sctp udp: DatagramTransport address: TransportAddress sctpSocket: ptr socket + recvEvent: AsyncEvent dataRecv: seq[byte] Sctp* = ref object @@ -31,6 +39,7 @@ type udp: DatagramTransport connections: Table[TransportAddress, SctpConnection] gotConnection: AsyncEvent + timerHandler: Future[void] case isServer: bool of true: sock: ptr socket @@ -44,6 +53,9 @@ type const IPPROTO_SCTP = 132 +proc newSctpError(msg: string): ref SctpError = + result = newException(SctpError, msg) + template usrsctpAwait(sctp: Sctp, body: untyped): untyped = sctp.sentFuture = nil when type(body) is void: @@ -73,46 +85,71 @@ proc new(T: typedesc[SctpConnection], address: TransportAddress, sctpSocket: ptr socket): T = T(sctp: sctp, + state: Connecting, udp: udp, address: address, sctpSocket: sctpSocket, - connectEvent: AsyncEvent()) + connectEvent: AsyncEvent(), + recvEvent: AsyncEvent()) proc read*(self: SctpConnection): Future[seq[byte]] {.async.} = trace "Read" - let x: seq[byte] = @[] - await sleepAsync(1.seconds) - return x + if self.dataRecv.len == 0: + self.recvEvent.clear() + await self.recvEvent.wait() + let res = self.dataRecv + self.dataRecv = @[] + return res proc write*(self: SctpConnection, buf: seq[byte]) {.async.} = trace "Write", buf self.sctp.sentConnection = self self.sctp.sentAddress = self.address let sendvErr = self.sctp.usrsctpAwait: - self.sctpSocket.usrsctp_sendv(addr buf, buf.len.uint, + self.sctpSocket.usrsctp_sendv(addr buf[0], buf.len.uint, nil, 0, nil, 0, SCTP_SENDV_NOINFO, 0) - echo "??? ", sendvErr - await self.sctp.sentFuture - echo "!!!" proc close*(self: SctpConnection) {.async.} = - trace "Close" + self.sctp.usrsctpAwait: self.sctpSocket.usrsctp_close() proc handleUpcall(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} = let events = usrsctp_get_events(sock) conn = cast[SctpConnection](data) trace "Handle Upcall", events - if not conn.connected: + if conn.state == Connecting: if bitand(events, SCTP_EVENT_ERROR) != 0: - discard - # TODO: raise smthg + warn "Cannot connect", address = conn.address + conn.state = Closed elif bitand(events, SCTP_EVENT_WRITE) != 0: - conn.connected = true - conn.connectEvent.fire() + conn.state = Connected + conn.connectEvent.fire() elif bitand(events, SCTP_EVENT_READ) != 0: - echo "recv" + var + buffer = newSeq[byte](4096) + address: Sockaddr_storage + rn: sctp_recvv_rn + addressLen = sizeof(Sockaddr_storage).SockLen + rnLen = sizeof(sctp_recvv_rn).SockLen + infotype: uint + flags: int + let n = sock.usrsctp_recvv(cast[pointer](addr buffer[0]), buffer.len.uint, + cast[ptr SockAddr](addr address), + cast[ptr SockLen](addr addressLen), + cast[pointer](addr rn), + cast[ptr SockLen](addr rnLen), + cast[ptr cuint](addr infotype), + cast[ptr cint](addr flags)) + if n < 0: + perror("usrsctp_recvv") + return + elif n > 0: + if bitand(flags, MSG_NOTIFICATION) != 0: + trace "Notification received", length = n + else: + conn.dataRecv = conn.dataRecv.concat(buffer[0..n]) + conn.recvEvent.fire() else: warn "Handle Upcall unexpected event", events @@ -126,7 +163,7 @@ proc handleAccept(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} = let conn = SctpConnection.new(sctp, sctp.udp, sctp.sentAddress, sctpSocket) sctp.connections[sctp.sentAddress] = conn sctp.pendingConnections.add(conn) - conn.connected = true + conn.state = Connected doAssert 0 == sctpSocket.usrsctp_set_upcall(handleUpcall, cast[pointer](conn)) sctp.gotConnection.fire() @@ -156,7 +193,7 @@ proc getOrCreateConnection(self: Sctp, self.sentAddress = address let connErr = self.usrsctpAwait: conn.sctpSocket.usrsctp_connect(cast[ptr SockAddr](addr sconn), SockLen(sizeof(sconn))) - doAssert 0 == connErr or errno == EINPROGRESS, ($errno) + doAssert 0 == connErr or errno == EINPROGRESS, ($errno) # TODO raise self.connections[address] = conn return conn @@ -178,7 +215,7 @@ proc sendCallback(address: pointer, trace "Send To", address await sendTo(sctp.udp, address, buf, int(length)) except CatchableError as exc: - echo "Failure: ", exc.msg + trace "Send Failed", address, message = exc.msg sctp.sentFuture = testSend() proc new*(T: typedesc[Sctp], @@ -192,20 +229,20 @@ proc new*(T: typedesc[Sctp], msg = udp.getMessage() data = usrsctp_dumppacket(addr msg[0], uint(msg.len), SCTP_DUMP_INBOUND) if data != nil: - trace "onReceive", server = sctp.isServer, data = data.packetPretty(), length = msg.len() + if sctp.isServer: + trace "onReceive (server)", data = data.packetPretty(), length = msg.len(), address + else: + trace "onReceive (client)", data = data.packetPretty(), length = msg.len(), address usrsctp_freedumpbuffer(data) if sctp.isServer: - echo "OnReceive (server): ", sctp.connections.len sctp.sentAddress = address usrsctp_conninput(cast[pointer](sctp), addr msg[0], uint(msg.len), 0) else: - echo "OnReceive (client): ", sctp.connections.len let conn = await sctp.getOrCreateConnection(udp, address) # TODO: Sctp Port? Read on the packet and get the port? I guess? sctp.sentConnection = conn sctp.sentAddress = address - echo "=> Address ", address usrsctp_conninput(cast[pointer](sctp), addr msg[0], uint(msg.len), 0) let localAddr = TransportAddress(family: AddressFamily.IPv4, port: Port(port)) @@ -213,6 +250,10 @@ proc new*(T: typedesc[Sctp], udp = newDatagramTransport(onReceive, local = laddr) trace "local address", localAddr, laddr sctp.udp = udp + sctp.timerHandler = (proc () {.async.} = + while true: + await sleepAsync(1.seconds) + usrsctp_handle_timers(1000))() # TODO: make it cleaner pls if not isServer: usrsctp_init_nothreads(0, sendCallback, printf) discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_NONE) @@ -220,11 +261,11 @@ proc new*(T: typedesc[Sctp], usrsctp_register_address(cast[pointer](sctp)) else: usrsctp_init_nothreads(port, sendCallback, printf) - discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL.uint32) + discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_NONE) doAssert 0 == usrsctp_sysctl_set_sctp_blackhole(2) doAssert 0 == usrsctp_sysctl_set_sctp_no_csum_on_loopback(0) + usrsctp_register_address(cast[pointer](sctp)) let sock = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil) - perror("usrsctp_socket") var on: int = 1 doAssert 0 == usrsctp_set_non_blocking(sock, 1) var sin: Sockaddr_in @@ -240,9 +281,8 @@ proc new*(T: typedesc[Sctp], proc listen*(self: Sctp): Future[SctpConnection] {.async.} = if not self.isServer: - # TODO: raise smthg - return - echo "Listening" + raise newSctpError("Not a server") + trace "Listening" if self.pendingConnections.len == 0: self.gotConnection.clear() await self.gotConnection.wait() @@ -254,5 +294,6 @@ proc connect*(self: Sctp, address: TransportAddress, sctpPort: uint16 = 5000): F trace "Connect", address let conn = await self.getOrCreateConnection(self.udp, address, sctpPort) await conn.connectEvent.wait() - if conn.connected: return conn - else: discard # TODO: raise smthg + if conn.state != Connected: + raise newSctpError("Cannot connect to " & $address) + return conn diff --git a/webrtc/usrsctp.nim b/webrtc/usrsctp.nim index 72964b6..3aa7180 100644 --- a/webrtc/usrsctp.nim +++ b/webrtc/usrsctp.nim @@ -9,7 +9,7 @@ const usrsctpInclude = root/"usrsctp"/"usrsctplib" {.passc: fmt"-I{usrsctpInclude}".} -# Generated @ 2022-11-21T15:11:52+01:00 +# Generated @ 2022-11-23T14:21:00+01:00 # Command line: # /home/lchenut/.nimble/pkgs/nimterop-0.6.13/nimterop/toast --compile=./usrsctp/usrsctplib/netinet/sctp_input.c --compile=./usrsctp/usrsctplib/netinet/sctp_asconf.c --compile=./usrsctp/usrsctplib/netinet/sctp_pcb.c --compile=./usrsctp/usrsctplib/netinet/sctp_usrreq.c --compile=./usrsctp/usrsctplib/netinet/sctp_cc_functions.c --compile=./usrsctp/usrsctplib/netinet/sctp_auth.c --compile=./usrsctp/usrsctplib/netinet/sctp_userspace.c --compile=./usrsctp/usrsctplib/netinet/sctp_output.c --compile=./usrsctp/usrsctplib/netinet/sctp_callout.c --compile=./usrsctp/usrsctplib/netinet/sctp_crc32.c --compile=./usrsctp/usrsctplib/netinet/sctp_sysctl.c --compile=./usrsctp/usrsctplib/netinet/sctp_sha1.c --compile=./usrsctp/usrsctplib/netinet/sctp_timer.c --compile=./usrsctp/usrsctplib/netinet/sctputil.c --compile=./usrsctp/usrsctplib/netinet/sctp_bsd_addr.c --compile=./usrsctp/usrsctplib/netinet/sctp_peeloff.c --compile=./usrsctp/usrsctplib/netinet/sctp_indata.c --compile=./usrsctp/usrsctplib/netinet/sctp_ss_functions.c --compile=./usrsctp/usrsctplib/user_socket.c --compile=./usrsctp/usrsctplib/netinet6/sctp6_usrreq.c --compile=./usrsctp/usrsctplib/user_mbuf.c --compile=./usrsctp/usrsctplib/user_environment.c --compile=./usrsctp/usrsctplib/user_recv_thread.c --pnim --preprocess --noHeader --defines=SCTP_PROCESS_LEVEL_LOCKS --defines=SCTP_SIMPLE_ALLOCATOR --defines=__Userspace__ --defines=STDC_HEADERS=1 --defines=HAVE_SYS_TYPES_H=1 --defines=HAVE_SYS_STAT_H=1 --defines=HAVE_STDLIB_H=1 --defines=HAVE_STRING_H=1 --defines=HAVE_MEMORY_H=1 --defines=HAVE_STRINGS_H=1 --defines=HAVE_INTTYPES_H=1 --defines=HAVE_STDINT_H=1 --defines=HAVE_UNISTD_H=1 --defines=HAVE_DLFCN_H=1 --defines=LT_OBJDIR=".libs/" --defines=SCTP_DEBUG=1 --defines=INET=1 --defines=INET6=1 --defines=HAVE_SOCKET=1 --defines=HAVE_INET_ADDR=1 --defines=HAVE_STDATOMIC_H=1 --defines=HAVE_SYS_QUEUE_H=1 --defines=HAVE_LINUX_IF_ADDR_H=1 --defines=HAVE_LINUX_RTNETLINK_H=1 --defines=HAVE_NETINET_IP_ICMP_H=1 --defines=HAVE_NET_ROUTE_H=1 --defines=_GNU_SOURCE --replace=sockaddr=SockAddr --replace=SockAddr_storage=Sockaddr_storage --replace=SockAddr_in=Sockaddr_in --replace=SockAddr_conn=Sockaddr_conn --replace=socklen_t=SockLen --includeDirs=./usrsctp/usrsctplib ./usrsctp/usrsctplib/usrsctp.h