Handle accept / write & fix multiple errors

This commit is contained in:
Ludovic Chenut 2022-11-03 17:27:01 +01:00
parent a9b8e2faa1
commit aa547badfd
No known key found for this signature in database
GPG Key ID: D9A59B1907F1D50C
4 changed files with 128 additions and 35 deletions

View File

@ -18,6 +18,9 @@ for file in `find ${root}/usrsctp/usrsctplib -name '*.c'`; do
compile="${compile} --compile=${file}" compile="${compile} --compile=${file}"
done done
# TODO: Make something more reliable, aka remove this LIBCFLAGS
# and put the different flags on prelude.nim depending on the
# OS we're currently on
LIBCFLAGS="$(grep "^LIBCFLAGS = " "${root}/usrsctp/Makefile" | cut -d' ' -f3- | sed 's/-D/--defines=/g')" LIBCFLAGS="$(grep "^LIBCFLAGS = " "${root}/usrsctp/Makefile" | cut -d' ' -f3- | sed 's/-D/--defines=/g')"
# generate nim wrapper with nimterop # generate nim wrapper with nimterop

View File

@ -7,8 +7,6 @@ const IPPROTO_SCTP = 132
let ta = initTAddress("127.0.0.1:4244") let ta = initTAddress("127.0.0.1:4244")
let tar = initTAddress("127.0.0.1:4242") let tar = initTAddress("127.0.0.1:4242")
proc discardFunc(transp: DatagramTransport, raddr: TransportAddress): Future[void] {.async.} = discard
proc connOutput(address: pointer, proc connOutput(address: pointer,
buffer: pointer, buffer: pointer,
length: uint, length: uint,

View File

@ -1,6 +1,10 @@
packageName = "webrtc" packageName = "webrtc"
version = "0.0.1" version = "0.0.1"
author = "Status Research & Development GmbH" author = "Status Research & Development GmbH"
description = "" description = "Webrtc stack"
licence = "" license = "MIT"
installDirs = @["usrsctp"] #installDirs = @["usrsctp"]
requires "nim >= 1.2.0",
"chronicles >= 0.10.2",
"chronos >= 3.0.6"

View File

@ -7,58 +7,146 @@
# This file may not be copied, modified, or distributed except according to # This file may not be copied, modified, or distributed except according to
# those terms. # those terms.
import tables, bitops, sequtils import tables, bitops, posix
import chronos import chronos, chronicles, stew/ranges/ptr_arith
import usrsctp import usrsctp
logScope:
topics = "webrtc sctp"
type type
SctpConnection* = ref object SctpConnection* = ref object
sctp: Sctp
udp: DatagramTransport udp: DatagramTransport
address: TransportAddress address: TransportAddress
sctpSocket: ptr socket
SctpListener* = ref object dataRecv: seq[byte]
udp: DatagramTransport
connections: Table[TransportAddress, SctpConnection]
Sctp* = ref object Sctp* = ref object
udps: seq[DatagramTransport] udp: DatagramTransport
connections: Table[TransportAddress, SctpConnection]
gotConnection: AsyncEvent
case isServer: bool
of true:
sock: ptr socket
of false:
discard
sentFuture: Future[void]
sentConnection: SctpConnection
const
IPPROTO_SCTP = 132
proc new(T: typedesc[SctpConnection], proc new(T: typedesc[SctpConnection],
sctp: Sctp,
udp: DatagramTransport, udp: DatagramTransport,
address: TransportAddress): T = address: TransportAddress,
T(udp: udp, address: address) sctpSocket: ptr socket): T =
T(sctp: sctp, udp: udp, address: address, sctpSocket: sctpSocket)
proc read(self: SctpConnection): Future[seq[byte]] = discard proc read(self: SctpConnection): Future[seq[byte]] = discard
proc write(self: SctpConnection, buf: seq[byte]) {.async.} = discard
proc write(self: SctpConnection, buf: seq[byte]) {.async.} =
self.sctp.sentConnection = self
discard self.sctpSocket.usrsctp_sendv(addr buf, buf.len.uint, nil, 0, nil, 0, SCTP_SENDV_NOINFO, 0)
await self.sctp.sentFuture
proc close(self: SctpConnection) {.async.} = discard proc close(self: SctpConnection) {.async.} = discard
proc getOrCreateConnection(self: SctpListener, proc handleUpcall(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} =
let e = usrsctp_get_events(sock)
echo "handleUpcall: event = ", e
if bitor(e, SCTP_EVENT_WRITE) != 0: # and not connected:
echo "connect"
#connected = true
elif bitor(e, SCTP_EVENT_READ) != 0:
echo "recv"
else:
echo "/!\\ ERROR /!\\"
proc handleAccept(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} =
var
sin: Sockaddr_in
size: SockLen
ipaddress: IpAddress
port: Port
let
sctp: Sctp = cast[ptr Sctp](data)[]
sctpSocket = usrsctp_accept(sctp.sock,
cast[ptr SockAddr](addr sin),
cast[ptr SockLen](addr size))
sin.fromSockAddr(sizeof(sin).SockLen, ipaddress, port)
let address = initTAddress(ipaddress, port)
sctp.connections[address] = SctpConnection.new(sctp, sctp.udp, address, sctpSocket)
proc getOrCreateConnection(self: Sctp,
udp: DatagramTransport, udp: DatagramTransport,
address: TransportAddress): SctpConnection = address: TransportAddress): SctpConnection =
if self.connections.hasKey(address): if self.connections.hasKey(address):
return self.connections[address] return self.connections[address]
let connection = SctpConnection.new(udp, address) let
self.connections[address] = connection sctpSocket = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil)
return connection conn = SctpConnection.new(self, udp, address, sctpSocket)
var on: int = 1
doAssert 0 == usrsctp_setsockopt(conn.sctpSocket,
IPPROTO_SCTP,
SCTP_RECVRCVINFO,
addr on,
sizeof(on).SockLen)
doAssert 0 == usrsctp_set_non_blocking(conn.sctpSocket, 1)
doAssert 0 == usrsctp_set_upcall(conn.sctpSocket, handleUpcall, nil)
self.connections[address] = conn
self.gotConnection.fire()
return conn
proc new(T: typedesc[SctpListener], address: TransportAddress): T = proc sendCallback(address: pointer,
let listener = T() buffer: pointer,
length: uint,
tos: uint8,
set_df: uint8): cint {.cdecl.} =
trace "sendCallback", data = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)
let sctp: ptr Sctp = cast[ptr Sctp](address)
proc testSend() {.async.} =
try:
let buf = @(buffer.makeOpenArray(byte, int(length)))
await sendTo(sctp[].udp, sctp.sentConnection.address, buf, int(length))
except CatchableError as exc:
echo "Failure: ", exc.msg
sctp.sentFuture = testSend()
proc new(T: typedesc[Sctp], port: uint16 = 9899, isServer: bool = true): T =
let
sctp = T(gotConnection: newAsyncEvent())
sctpPtr = cast[pointer](addr sctp)
proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async.} = proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async.} =
let connection = listener.getOrCreateConnection(udp, address) let
connection.receive(udp.getMessage()) msg = udp.getMessage()
let udp = newDatagramTransport(onReceive, local = address) conn = sctp.getOrCreateConnection(udp, address)
listener.udp = udp connPtr = cast[pointer](addr conn)
listener usrsctp_conninput(connPtr, addr msg[0], uint(msg.len), 0)
let
localAddr = TransportAddress(family: AddressFamily.IPv4, port: Port(port))
udp = newDatagramTransport(onReceive, local = localAddr)
usrsctp_init_nothreads(port, sendCallback, nil) # TODO maybe put a debugger instead of nil
usrsctp_register_address(sctpPtr)
sctp.udp = udp
if isServer:
let sock = usrsctp_socket(AF_INET, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil)
sock.usrsctp_set_upcall(handleAccept, sctpPtr)
var sconn: Sockaddr_conn
sconn.sconn_family = AF_CONN
sconn.sconn_port = htons(0)
sconn.sconn_addr = nil
doAssert 0 == usrsctp_bind(sock, cast[ptr SockAddr](addr sconn), sizeof(sconn).SockLen)
doAssert 0 < usrsctp_listen(sock, 1)
return sctp
proc new(T: typedesc[Sctp]): T = proc listen(self: Sctp, address: TransportAddress): Future[SctpConnection] {.async.} =
T() while true:
if self.connections.hasKey(address):
proc listen(self: Sctp, address: TransportAddress): SctpListener = return self.connections[address]
# what should happen when adding multiple time the same address self.gotConnection.clear()
proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async.} = await self.gotConnection.wait()
discard
let udp = newDatagramTransport(onReceive, local = address)
self.udps.add(udp)
proc connect(self: Sctp): Future[SctpConnection] = discard proc connect(self: Sctp): Future[SctpConnection] = discard
proc dial(self: Sctp, address: TransportAddress): Future[SctpConnection] = discard proc dial(self: Sctp, address: TransportAddress): Future[SctpConnection] = discard