fix small errors on sctp
This commit is contained in:
parent
dee08191cf
commit
6391a3f2e5
|
@ -58,6 +58,7 @@ type
|
|||
remoteCert: seq[byte]
|
||||
|
||||
proc dtlsSend*(ctx: pointer, buf: ptr byte, len: uint): cint {.cdecl.} =
|
||||
trace "dtls send", len
|
||||
var self = cast[DtlsConn](ctx)
|
||||
var toWrite = newSeq[byte](len)
|
||||
if len > 0:
|
||||
|
@ -66,6 +67,7 @@ proc dtlsSend*(ctx: pointer, buf: ptr byte, len: uint): cint {.cdecl.} =
|
|||
result = len.cint
|
||||
|
||||
proc dtlsRecv*(ctx: pointer, buf: ptr byte, len: uint): cint {.cdecl.} =
|
||||
trace "dtls receive", len
|
||||
var
|
||||
self = cast[DtlsConn](ctx)
|
||||
dataRecv = self.dataRecv.popFirstNoWait()
|
||||
|
|
170
webrtc/sctp.nim
170
webrtc/sctp.nim
|
@ -51,6 +51,7 @@ type
|
|||
address: TransportAddress
|
||||
sctpSocket: ptr socket
|
||||
dataRecv: AsyncQueue[SctpMessage]
|
||||
sentFuture: Future[void]
|
||||
|
||||
Sctp* = ref object
|
||||
dtls: Dtls
|
||||
|
@ -62,9 +63,9 @@ type
|
|||
sockServer: ptr socket
|
||||
pendingConnections: seq[SctpConn]
|
||||
pendingConnections2: Table[SockAddr, SctpConn]
|
||||
sentFuture: Future[void]
|
||||
sentConnection: SctpConn
|
||||
sentAddress: TransportAddress
|
||||
sentFuture: Future[void]
|
||||
|
||||
const
|
||||
IPPROTO_SCTP = 132
|
||||
|
@ -72,14 +73,14 @@ const
|
|||
proc newSctpError(msg: string): ref SctpError =
|
||||
result = newException(SctpError, msg)
|
||||
|
||||
template usrsctpAwait(sctp: Sctp, body: untyped): untyped =
|
||||
sctp.sentFuture = nil
|
||||
template usrsctpAwait(self: SctpConn|Sctp, body: untyped): untyped =
|
||||
self.sentFuture = nil
|
||||
when type(body) is void:
|
||||
body
|
||||
if sctp.sentFuture != nil: await sctp.sentFuture
|
||||
if self.sentFuture != nil: await self.sentFuture
|
||||
else:
|
||||
let res = body
|
||||
if sctp.sentFuture != nil: await sctp.sentFuture
|
||||
if self.sentFuture != nil: await self.sentFuture
|
||||
res
|
||||
|
||||
proc perror(error: cstring) {.importc, cdecl, header: "<errno.h>".}
|
||||
|
@ -109,8 +110,9 @@ proc new(T: typedesc[SctpConn],
|
|||
dataRecv: newAsyncQueue[SctpMessage]()
|
||||
)
|
||||
|
||||
proc new(T: typedesc[SctpConn], conn: DtlsConn): T =
|
||||
proc new(T: typedesc[SctpConn], conn: DtlsConn, sctp: Sctp): T =
|
||||
T(conn: conn,
|
||||
sctp: sctp,
|
||||
state: Connecting,
|
||||
connectEvent: AsyncEvent(),
|
||||
acceptEvent: AsyncEvent(),
|
||||
|
@ -146,7 +148,7 @@ proc write*(
|
|||
), cuint(SCTP_SENDV_SNDINFO))
|
||||
else:
|
||||
(default(sctp_sndinfo), cuint(SCTP_SENDV_NOINFO))
|
||||
sendvErr = self.sctp.usrsctpAwait:
|
||||
sendvErr = self.usrsctpAwait:
|
||||
self.sctpSocket.usrsctp_sendv(unsafeAddr buf[0], buf.len.uint,
|
||||
nil, 0, unsafeAddr sendInfo, sizeof(sendInfo).SockLen,
|
||||
infoType, 0)
|
||||
|
@ -155,7 +157,7 @@ proc write*(self: SctpConn, s: string) {.async.} =
|
|||
await self.write(s.toBytes())
|
||||
|
||||
proc close*(self: SctpConn) {.async.} =
|
||||
self.sctp.usrsctpAwait: self.sctpSocket.usrsctp_close()
|
||||
self.usrsctpAwait: self.sctpSocket.usrsctp_close()
|
||||
|
||||
proc handleUpcall(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} =
|
||||
let
|
||||
|
@ -216,43 +218,42 @@ proc handleAccept(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} =
|
|||
sctp = cast[Sctp](data)
|
||||
sctpSocket = usrsctp_accept(sctp.sockServer, cast[ptr SockAddr](addr sconn), addr slen)
|
||||
|
||||
# echo cast[uint64](sconn.sconn_addr)
|
||||
doAssert 0 == sctpSocket.usrsctp_set_non_blocking(1)
|
||||
let conn = cast[SctpConn](sconn.sconn_addr)
|
||||
conn.state = Connected
|
||||
conn.acceptEvent.fire()
|
||||
|
||||
proc getOrCreateConnection(self: Sctp,
|
||||
udp: DatagramTransport,
|
||||
address: TransportAddress,
|
||||
sctpPort: uint16 = 5000): Future[SctpConn] {.async.} =
|
||||
#TODO remove the = 5000
|
||||
if self.connections.hasKey(address):
|
||||
return self.connections[address]
|
||||
trace "Create Connection", address
|
||||
let
|
||||
sctpSocket = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil)
|
||||
conn = SctpConn.new(self, udp, address, sctpSocket)
|
||||
var on: int = 1
|
||||
doAssert 0 == conn.sctpSocket.usrsctp_setsockopt(IPPROTO_SCTP,
|
||||
SCTP_RECVRCVINFO,
|
||||
addr on,
|
||||
sizeof(on).SockLen)
|
||||
doAssert 0 == usrsctp_set_non_blocking(conn.sctpSocket, 1)
|
||||
doAssert 0 == usrsctp_set_upcall(conn.sctpSocket, handleUpcall, cast[pointer](conn))
|
||||
var sconn: Sockaddr_conn
|
||||
sconn.sconn_family = AF_CONN
|
||||
sconn.sconn_port = htons(sctpPort)
|
||||
sconn.sconn_addr = cast[pointer](self)
|
||||
self.sentConnection = conn
|
||||
self.sentAddress = address
|
||||
let connErr = self.usrsctpAwait:
|
||||
conn.sctpSocket.usrsctp_connect(cast[ptr SockAddr](addr sconn), SockLen(sizeof(sconn)))
|
||||
doAssert 0 == connErr or errno == posix.EINPROGRESS, ($errno)
|
||||
self.connections[address] = conn
|
||||
return conn
|
||||
# proc getOrCreateConnection(self: Sctp,
|
||||
# udp: DatagramTransport,
|
||||
# address: TransportAddress,
|
||||
# sctpPort: uint16 = 5000): Future[SctpConn] {.async.} =
|
||||
# #TODO remove the = 5000
|
||||
# if self.connections.hasKey(address):
|
||||
# return self.connections[address]
|
||||
# trace "Create Connection", address
|
||||
# let
|
||||
# sctpSocket = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil)
|
||||
# conn = SctpConn.new(self, udp, address, sctpSocket)
|
||||
# var on: int = 1
|
||||
# doAssert 0 == conn.sctpSocket.usrsctp_setsockopt(IPPROTO_SCTP,
|
||||
# SCTP_RECVRCVINFO,
|
||||
# addr on,
|
||||
# sizeof(on).SockLen)
|
||||
# doAssert 0 == usrsctp_set_non_blocking(conn.sctpSocket, 1)
|
||||
# doAssert 0 == usrsctp_set_upcall(conn.sctpSocket, handleUpcall, cast[pointer](conn))
|
||||
# var sconn: Sockaddr_conn
|
||||
# sconn.sconn_family = AF_CONN
|
||||
# sconn.sconn_port = htons(sctpPort)
|
||||
# sconn.sconn_addr = cast[pointer](self)
|
||||
# self.sentConnection = conn
|
||||
# self.sentAddress = address
|
||||
# let connErr = self.usrsctpAwait:
|
||||
# conn.sctpSocket.usrsctp_connect(cast[ptr SockAddr](addr sconn), SockLen(sizeof(sconn)))
|
||||
# doAssert 0 == connErr or errno == posix.EINPROGRESS, ($errno)
|
||||
# self.connections[address] = conn
|
||||
# return conn
|
||||
|
||||
proc sendCallback(address: pointer,
|
||||
proc sendCallback(ctx: pointer,
|
||||
buffer: pointer,
|
||||
length: uint,
|
||||
tos: uint8,
|
||||
|
@ -261,17 +262,15 @@ proc sendCallback(address: pointer,
|
|||
if data != nil:
|
||||
trace "sendCallback", data = data.packetPretty(), length
|
||||
usrsctp_freedumpbuffer(data)
|
||||
let sctp = cast[Sctp](address)
|
||||
let sctpConn = cast[SctpConn](ctx)
|
||||
proc testSend() {.async.} =
|
||||
try:
|
||||
let
|
||||
buf = @(buffer.makeOpenArray(byte, int(length)))
|
||||
address = sctp.sentAddress
|
||||
trace "Send To", address
|
||||
await sendTo(sctp.udp, address, buf, int(length))
|
||||
let buf = @(buffer.makeOpenArray(byte, int(length)))
|
||||
trace "Send To", address = sctpConn.address
|
||||
await sctpConn.conn.write(buf)
|
||||
except CatchableError as exc:
|
||||
trace "Send Failed", message = exc.msg
|
||||
sctp.sentFuture = testSend()
|
||||
sctpConn.sentFuture = testSend()
|
||||
|
||||
proc timersHandler() {.async.} =
|
||||
while true:
|
||||
|
@ -300,42 +299,42 @@ proc new*(T: typedesc[Sctp], dtls: Dtls, laddr: TransportAddress): T =
|
|||
usrsctp_register_address(cast[pointer](sctp))
|
||||
return sctp
|
||||
|
||||
proc new*(T: typedesc[Sctp], port: uint16 = 9899): T =
|
||||
logScope: topics = "webrtc sctp"
|
||||
let sctp = T(gotConnection: newAsyncEvent())
|
||||
proc onReceive(udp: DatagramTransport, raddr: TransportAddress) {.async, gcsafe.} =
|
||||
let
|
||||
msg = udp.getMessage()
|
||||
data = usrsctp_dumppacket(unsafeAddr msg[0], uint(msg.len), SCTP_DUMP_INBOUND)
|
||||
if data != nil:
|
||||
if sctp.isServer:
|
||||
trace "onReceive (server)", data = data.packetPretty(), length = msg.len(), raddr
|
||||
else:
|
||||
trace "onReceive (client)", data = data.packetPretty(), length = msg.len(), raddr
|
||||
usrsctp_freedumpbuffer(data)
|
||||
|
||||
if sctp.isServer:
|
||||
sctp.sentAddress = raddr
|
||||
usrsctp_conninput(cast[pointer](sctp), unsafeAddr msg[0], uint(msg.len), 0)
|
||||
else:
|
||||
let conn = await sctp.getOrCreateConnection(udp, raddr)
|
||||
sctp.sentConnection = conn
|
||||
sctp.sentAddress = raddr
|
||||
usrsctp_conninput(cast[pointer](sctp), unsafeAddr msg[0], uint(msg.len), 0)
|
||||
let
|
||||
localAddr = TransportAddress(family: AddressFamily.IPv4, port: Port(port))
|
||||
laddr = initTAddress("127.0.0.1:" & $port)
|
||||
udp = newDatagramTransport(onReceive, local = laddr)
|
||||
trace "local address", localAddr, laddr
|
||||
sctp.udp = udp
|
||||
sctp.timersHandler = timersHandler()
|
||||
|
||||
usrsctp_init_nothreads(port, sendCallback, printf)
|
||||
discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_NONE)
|
||||
discard usrsctp_sysctl_set_sctp_ecn_enable(1)
|
||||
usrsctp_register_address(cast[pointer](sctp))
|
||||
|
||||
return sctp
|
||||
#proc new*(T: typedesc[Sctp], port: uint16 = 9899): T =
|
||||
# logScope: topics = "webrtc sctp"
|
||||
# let sctp = T(gotConnection: newAsyncEvent())
|
||||
# proc onReceive(udp: DatagramTransport, raddr: TransportAddress) {.async, gcsafe.} =
|
||||
# let
|
||||
# msg = udp.getMessage()
|
||||
# data = usrsctp_dumppacket(unsafeAddr msg[0], uint(msg.len), SCTP_DUMP_INBOUND)
|
||||
# if data != nil:
|
||||
# if sctp.isServer:
|
||||
# trace "onReceive (server)", data = data.packetPretty(), length = msg.len(), raddr
|
||||
# else:
|
||||
# trace "onReceive (client)", data = data.packetPretty(), length = msg.len(), raddr
|
||||
# usrsctp_freedumpbuffer(data)
|
||||
#
|
||||
# if sctp.isServer:
|
||||
# sctp.sentAddress = raddr
|
||||
# usrsctp_conninput(cast[pointer](sctp), unsafeAddr msg[0], uint(msg.len), 0)
|
||||
# else:
|
||||
# let conn = await sctp.getOrCreateConnection(udp, raddr)
|
||||
# sctp.sentConnection = conn
|
||||
# sctp.sentAddress = raddr
|
||||
# usrsctp_conninput(cast[pointer](sctp), unsafeAddr msg[0], uint(msg.len), 0)
|
||||
# let
|
||||
# localAddr = TransportAddress(family: AddressFamily.IPv4, port: Port(port))
|
||||
# laddr = initTAddress("127.0.0.1:" & $port)
|
||||
# udp = newDatagramTransport(onReceive, local = laddr)
|
||||
# trace "local address", localAddr, laddr
|
||||
# sctp.udp = udp
|
||||
# sctp.timersHandler = timersHandler()
|
||||
#
|
||||
# usrsctp_init_nothreads(port, sendCallback, printf)
|
||||
# discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_NONE)
|
||||
# discard usrsctp_sysctl_set_sctp_ecn_enable(1)
|
||||
# usrsctp_register_address(cast[pointer](sctp))
|
||||
#
|
||||
# return sctp
|
||||
|
||||
proc stop*(self: Sctp) {.async.} =
|
||||
discard self.usrsctpAwait usrsctp_finish()
|
||||
|
@ -343,20 +342,21 @@ proc stop*(self: Sctp) {.async.} =
|
|||
|
||||
proc readLoopProc(res: SctpConn) {.async.} =
|
||||
while true:
|
||||
trace "Read Loop Proc Before"
|
||||
let
|
||||
msg = await res.conn.read()
|
||||
data = usrsctp_dumppacket(unsafeAddr msg[0], uint(msg.len), SCTP_DUMP_INBOUND)
|
||||
trace "Read Loop Proc Before", isnil=data.isNil()
|
||||
if data != nil:
|
||||
trace "Receive connection", remoteAddress = res.conn.raddr, data = data.packetPretty()
|
||||
usrsctp_freedumpbuffer(data)
|
||||
res.sctp.sentConnection = res
|
||||
usrsctp_conninput(cast[pointer](res), unsafeAddr msg[0], uint(msg.len), 0)
|
||||
|
||||
proc accept*(self: Sctp): Future[SctpConn] {.async.} =
|
||||
if not self.isServer:
|
||||
raise newSctpError("Not a server")
|
||||
let conn = await self.dtls.accept()
|
||||
var res = SctpConn.new(conn)
|
||||
res.conn = await self.dtls.accept()
|
||||
var res = SctpConn.new(await self.dtls.accept, self)
|
||||
usrsctp_register_address(cast[pointer](res))
|
||||
res.readLoop = res.readLoopProc()
|
||||
res.acceptEvent.clear()
|
||||
|
|
Loading…
Reference in New Issue