Refacto & add a ping/pong example
This commit is contained in:
parent
41eda8a9e4
commit
d5db29a8b7
|
@ -1,14 +0,0 @@
|
|||
import chronos, stew/byteutils
|
||||
import ../webrtc/sctp
|
||||
|
||||
proc main() {.async.} =
|
||||
let
|
||||
sctp = Sctp.new(port = 4244)
|
||||
address = TransportAddress(initTAddress("127.0.0.1:4242"))
|
||||
conn = await sctp.connect(address, sctpPort = 13)
|
||||
#let msg = await conn.read()
|
||||
#echo string.fromBytes(msg)
|
||||
await conn.write("toto".toBytes)
|
||||
await conn.close()
|
||||
|
||||
waitFor(main())
|
|
@ -0,0 +1,13 @@
|
|||
import chronos, stew/byteutils
|
||||
import ../webrtc/sctp
|
||||
|
||||
proc main() {.async.} =
|
||||
let sctp = Sctp.new(port = 4244)
|
||||
let conn = await sctp.connect(initTAddress("127.0.0.1:4242"), sctpPort = 13)
|
||||
while true:
|
||||
await conn.write("ping".toBytes)
|
||||
let msg = await conn.read()
|
||||
echo "Received: ", string.fromBytes(msg)
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
waitFor(main())
|
|
@ -0,0 +1,19 @@
|
|||
import chronos, stew/byteutils
|
||||
import ../webrtc/sctp
|
||||
|
||||
proc sendPong(conn: SctpConnection) {.async.} =
|
||||
var i = 0
|
||||
while true:
|
||||
let msg = await conn.read()
|
||||
echo "Received: ", string.fromBytes(msg)
|
||||
await conn.write(("pong " & $i).toBytes)
|
||||
i.inc()
|
||||
|
||||
proc main() {.async.} =
|
||||
let sctp = Sctp.new(port = 4242)
|
||||
sctp.startServer(13)
|
||||
while true:
|
||||
let conn = await sctp.listen()
|
||||
asyncSpawn conn.sendPong()
|
||||
|
||||
waitFor(main())
|
|
@ -1,14 +0,0 @@
|
|||
import chronos, stew/byteutils
|
||||
import ../webrtc/sctp
|
||||
|
||||
proc main() {.async.} =
|
||||
let
|
||||
sctp = Sctp.new(port = 4242, isServer = true, sctpPort = 13)
|
||||
conn = await sctp.listen()
|
||||
#await conn.write("toto".toBytes)
|
||||
#await sleepAsync(3.seconds)
|
||||
let msg = await conn.read()
|
||||
echo "Receive: ", string.fromBytes(msg)
|
||||
await conn.close()
|
||||
|
||||
waitFor(main())
|
|
@ -1,78 +0,0 @@
|
|||
import bitops
|
||||
import chronos, posix
|
||||
import ../webrtc/usrsctp
|
||||
import stew/ranges/ptr_arith
|
||||
|
||||
proc printf(format: cstring) {.cdecl, varargs.} = echo "printf"
|
||||
proc perror(error: cstring) {.importc, cdecl, header: "<errno.h>".}
|
||||
const IPPROTO_SCTP = 132
|
||||
let ta = initTAddress("127.0.0.1:4244")
|
||||
let tar = initTAddress("127.0.0.1:4242")
|
||||
|
||||
proc connOutput(address: pointer,
|
||||
buffer: pointer,
|
||||
length: uint,
|
||||
tos: uint8,
|
||||
set_df: uint8): cint {.cdecl.} =
|
||||
echo "====> connOutput: ", usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)
|
||||
let dg: ptr DatagramTransport = cast[ptr DatagramTransport](address)
|
||||
proc testSend() {.async.} =
|
||||
try:
|
||||
let buf = @(buffer.makeOpenArray(byte, int(length)))
|
||||
echo "START await sendTo START"
|
||||
await sendTo(dg[], tar, buf, int(length))
|
||||
echo "STOP await sendTo STOP"
|
||||
except CatchableError as exc:
|
||||
echo "Failure: ", exc.msg
|
||||
|
||||
asyncSpawn testSend()
|
||||
echo "connOutput <===="
|
||||
|
||||
var connected = false
|
||||
proc handleUpcall(sock: ptr socket, arg: pointer, length: cint) {.cdecl.} =
|
||||
let e = usrsctp_get_events(sock)
|
||||
echo "handleUpcall: event = ", e
|
||||
if bitor(e, SCTP_EVENT_WRITE) != 0 and not connected:
|
||||
echo "connect"
|
||||
connected = true
|
||||
elif bitor(e, SCTP_EVENT_READ) != 0:
|
||||
echo "recv"
|
||||
else:
|
||||
echo "/!\\ ERROR /!\\"
|
||||
|
||||
proc handleEvents(dg: DatagramTransport, sock: ptr socket, sconn_addr: pointer) {.async.} =
|
||||
await sleepAsync(3.seconds)
|
||||
|
||||
proc main {.async, gcsafe.} =
|
||||
let fut = newFuture[void]()
|
||||
var p: pointer
|
||||
proc clientMark(transp: DatagramTransport, raddr: TransportAddress): Future[void] {.async.} =
|
||||
var msg = transp.getMessage()
|
||||
echo "Client Mark: ", usrsctp_dumppacket(addr msg[0], uint(msg.len), SCTP_DUMP_INBOUND)
|
||||
usrsctp_conninput(p, addr msg[0], uint(msg.len), 0)
|
||||
|
||||
var dg = newDatagramTransport(clientMark, remote=tar, local=ta)
|
||||
p = cast[pointer](addr dg)
|
||||
usrsctp_init_nothreads(0, connOutput, printf)
|
||||
discard usrsctp_sysctl_set_sctp_ecn_enable(1)
|
||||
usrsctp_register_address(p)
|
||||
let sock = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil)
|
||||
var on: int = 1
|
||||
doAssert 0 == usrsctp_setsockopt(sock, IPPROTO_SCTP, SCTP_RECVRCVINFO, addr on, sizeof(on).SockLen)
|
||||
doAssert 0 == usrsctp_set_non_blocking(sock, 1)
|
||||
doAssert 0 == usrsctp_set_upcall(sock, handleUpcall, nil)
|
||||
var sconn: Sockaddr_conn
|
||||
sconn.sconn_family = AF_CONN
|
||||
sconn.sconn_port = htons(0)
|
||||
sconn.sconn_addr = nil
|
||||
doAssert 0 == usrsctp_bind(sock, cast[ptr SockAddr](addr sconn), sizeof(sconn).SockLen)
|
||||
sconn.sconn_family = AF_CONN
|
||||
sconn.sconn_port = htons(13)
|
||||
sconn.sconn_addr = p
|
||||
let connErr = usrsctp_connect(sock, cast[ptr SockAddr](addr sconn), sizeof(sconn).SockLen)
|
||||
perror("usrsctp_connect")
|
||||
doAssert 0 == connErr or errno == EINPROGRESS, ($errno)
|
||||
|
||||
await handleEvents(dg, sock, sconn.sconn_addr)
|
||||
|
||||
waitFor(main())
|
|
@ -1,7 +0,0 @@
|
|||
import chronos
|
||||
import ../webrtc/usrsctp
|
||||
|
||||
proc main() {.async.} =
|
||||
discard
|
||||
|
||||
waitFor(main())
|
106
webrtc/sctp.nim
106
webrtc/sctp.nim
|
@ -35,17 +35,13 @@ type
|
|||
dataRecv: seq[byte]
|
||||
|
||||
Sctp* = ref object
|
||||
running: bool
|
||||
udp: DatagramTransport
|
||||
connections: Table[TransportAddress, SctpConnection]
|
||||
gotConnection: AsyncEvent
|
||||
timerHandler: Future[void]
|
||||
case isServer: bool
|
||||
of true:
|
||||
sock: ptr socket
|
||||
pendingConnections: seq[SctpConnection]
|
||||
of false:
|
||||
discard
|
||||
timersHandler: Future[void]
|
||||
isServer: bool
|
||||
sockServer: ptr socket
|
||||
pendingConnections: seq[SctpConnection]
|
||||
sentFuture: Future[void]
|
||||
sentConnection: SctpConnection
|
||||
sentAddress: TransportAddress
|
||||
|
@ -69,7 +65,6 @@ template usrsctpAwait(sctp: Sctp, body: untyped): untyped =
|
|||
proc perror(error: cstring) {.importc, cdecl, header: "<errno.h>".}
|
||||
proc printf(format: cstring) {.cdecl, importc: "printf", varargs, header: "<stdio.h>", gcsafe.}
|
||||
|
||||
proc `$`(p: pointer): string = "0x" & cast[uint](p).toHex() # TODO: Delete this
|
||||
proc packetPretty(packet: cstring): string =
|
||||
let data = $packet
|
||||
let ctn = data[23..^16]
|
||||
|
@ -154,10 +149,10 @@ proc handleUpcall(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} =
|
|||
warn "Handle Upcall unexpected event", events
|
||||
|
||||
proc handleAccept(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} =
|
||||
trace "Handle Accept", data
|
||||
trace "Handle Accept"
|
||||
let
|
||||
sctp = cast[Sctp](data)
|
||||
sctpSocket = usrsctp_accept(sctp.sock, nil, nil)
|
||||
sctpSocket = usrsctp_accept(sctp.sockServer, nil, nil)
|
||||
|
||||
doAssert 0 == sctpSocket.usrsctp_set_non_blocking(1)
|
||||
let conn = SctpConnection.new(sctp, sctp.udp, sctp.sentAddress, sctpSocket)
|
||||
|
@ -215,15 +210,47 @@ proc sendCallback(address: pointer,
|
|||
trace "Send To", address
|
||||
await sendTo(sctp.udp, address, buf, int(length))
|
||||
except CatchableError as exc:
|
||||
trace "Send Failed", address, message = exc.msg
|
||||
trace "Send Failed", message = exc.msg
|
||||
sctp.sentFuture = testSend()
|
||||
|
||||
proc new*(T: typedesc[Sctp],
|
||||
port: uint16 = 9899,
|
||||
isServer: bool = false,
|
||||
sctpPort: uint16 = 5000): T =
|
||||
proc timersHandler() {.async.} =
|
||||
while true:
|
||||
await sleepAsync(500.milliseconds)
|
||||
usrsctp_handle_timers(500)
|
||||
|
||||
proc startServer*(self: Sctp, sctpPort: uint16 = 5000) =
|
||||
if self.isServer:
|
||||
trace "Try to start the server twice"
|
||||
return
|
||||
self.isServer = true
|
||||
doAssert 0 == usrsctp_sysctl_set_sctp_blackhole(2)
|
||||
doAssert 0 == usrsctp_sysctl_set_sctp_no_csum_on_loopback(0)
|
||||
let sock = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil)
|
||||
var on: int = 1
|
||||
doAssert 0 == usrsctp_set_non_blocking(sock, 1)
|
||||
var sin: Sockaddr_in
|
||||
sin.sin_family = posix.AF_INET.uint16
|
||||
sin.sin_port = htons(sctpPort)
|
||||
sin.sin_addr.s_addr = htonl(INADDR_ANY)
|
||||
doAssert 0 == usrsctp_bind(sock, cast[ptr SockAddr](addr sin), SockLen(sizeof(Sockaddr_in)))
|
||||
doAssert 0 >= usrsctp_listen(sock, 1)
|
||||
doAssert 0 == sock.usrsctp_set_upcall(handleAccept, cast[pointer](self))
|
||||
self.sockServer = sock
|
||||
|
||||
proc closeServer(self: Sctp) =
|
||||
if not self.isServer:
|
||||
trace "Try to close a client"
|
||||
return
|
||||
self.isServer = false
|
||||
let pcs = self.pendingConnections
|
||||
self.pendingConnections = @[]
|
||||
for pc in pcs:
|
||||
pc.sctpSocket.usrsctp_close()
|
||||
self.sockServer.usrsctp_close()
|
||||
|
||||
proc new*(T: typedesc[Sctp], port: uint16 = 9899): T =
|
||||
logScope: topics = "webrtc sctp"
|
||||
let sctp = T(gotConnection: newAsyncEvent(), isServer: isServer)
|
||||
let sctp = T(gotConnection: newAsyncEvent())
|
||||
proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async, gcsafe.} =
|
||||
let
|
||||
msg = udp.getMessage()
|
||||
|
@ -240,7 +267,6 @@ proc new*(T: typedesc[Sctp],
|
|||
usrsctp_conninput(cast[pointer](sctp), addr msg[0], uint(msg.len), 0)
|
||||
else:
|
||||
let conn = await sctp.getOrCreateConnection(udp, address)
|
||||
# TODO: Sctp Port? Read on the packet and get the port? I guess?
|
||||
sctp.sentConnection = conn
|
||||
sctp.sentAddress = address
|
||||
usrsctp_conninput(cast[pointer](sctp), addr msg[0], uint(msg.len), 0)
|
||||
|
@ -250,33 +276,13 @@ proc new*(T: typedesc[Sctp],
|
|||
udp = newDatagramTransport(onReceive, local = laddr)
|
||||
trace "local address", localAddr, laddr
|
||||
sctp.udp = udp
|
||||
sctp.timerHandler = (proc () {.async.} =
|
||||
while true:
|
||||
await sleepAsync(1.seconds)
|
||||
usrsctp_handle_timers(1000))() # TODO: make it cleaner pls
|
||||
if not isServer:
|
||||
usrsctp_init_nothreads(0, sendCallback, printf)
|
||||
discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_NONE)
|
||||
discard usrsctp_sysctl_set_sctp_ecn_enable(1)
|
||||
usrsctp_register_address(cast[pointer](sctp))
|
||||
else:
|
||||
usrsctp_init_nothreads(port, sendCallback, printf)
|
||||
discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_NONE)
|
||||
doAssert 0 == usrsctp_sysctl_set_sctp_blackhole(2)
|
||||
doAssert 0 == usrsctp_sysctl_set_sctp_no_csum_on_loopback(0)
|
||||
usrsctp_register_address(cast[pointer](sctp))
|
||||
let sock = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil)
|
||||
var on: int = 1
|
||||
doAssert 0 == usrsctp_set_non_blocking(sock, 1)
|
||||
var sin: Sockaddr_in
|
||||
sin.sin_family = AF_INET.uint16
|
||||
sin.sin_port = htons(sctpPort)
|
||||
sin.sin_addr.s_addr = htonl(INADDR_ANY)
|
||||
doAssert 0 == usrsctp_bind(sock, cast[ptr SockAddr](addr sin), SockLen(sizeof(Sockaddr_in)))
|
||||
doAssert 0 >= usrsctp_listen(sock, 1)
|
||||
doAssert 0 == sock.usrsctp_set_upcall(handleAccept, cast[pointer](sctp))
|
||||
sctp.sock = sock
|
||||
sctp.running = true
|
||||
sctp.timersHandler = timersHandler()
|
||||
|
||||
usrsctp_init_nothreads(port, sendCallback, printf)
|
||||
discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_NONE)
|
||||
discard usrsctp_sysctl_set_sctp_ecn_enable(1)
|
||||
usrsctp_register_address(cast[pointer](sctp))
|
||||
|
||||
return sctp
|
||||
|
||||
proc listen*(self: Sctp): Future[SctpConnection] {.async.} =
|
||||
|
@ -290,10 +296,16 @@ proc listen*(self: Sctp): Future[SctpConnection] {.async.} =
|
|||
self.pendingConnections.delete(0)
|
||||
return res
|
||||
|
||||
proc connect*(self: Sctp, address: TransportAddress, sctpPort: uint16 = 5000): Future[SctpConnection] {.async.} =
|
||||
proc connect*(self: Sctp,
|
||||
address: TransportAddress,
|
||||
sctpPort: uint16 = 5000): Future[SctpConnection] {.async.} =
|
||||
trace "Connect", address
|
||||
let conn = await self.getOrCreateConnection(self.udp, address, sctpPort)
|
||||
await conn.connectEvent.wait()
|
||||
try:
|
||||
await conn.connectEvent.wait()
|
||||
except CancelledError as exc:
|
||||
conn.sctpSocket.usrsctp_close()
|
||||
return nil
|
||||
if conn.state != Connected:
|
||||
raise newSctpError("Cannot connect to " & $address)
|
||||
return conn
|
||||
|
|
Loading…
Reference in New Issue