sctp/dtls client done

This commit is contained in:
Ludovic Chenut 2024-02-28 13:49:43 +01:00
parent 0fa09ba6f7
commit f6ba794281
No known key found for this signature in database
GPG Key ID: D9A59B1907F1D50C
4 changed files with 74 additions and 155 deletions

View File

@ -11,6 +11,5 @@ requires "nim >= 1.2.0",
"https://github.com/status-im/nim-binary-serialization.git", "https://github.com/status-im/nim-binary-serialization.git",
"https://github.com/status-im/nim-mbedtls.git" "https://github.com/status-im/nim-mbedtls.git"
proc runTest(filename: string) = proc runTest(filename: string) =
discard discard

View File

@ -82,10 +82,13 @@ proc init*(self: DtlsConn, conn: StunConn, laddr: TransportAddress) {.async.} =
self.dataRecv = newAsyncQueue[seq[byte]]() self.dataRecv = newAsyncQueue[seq[byte]]()
proc write*(self: DtlsConn, msg: seq[byte]) {.async.} = proc write*(self: DtlsConn, msg: seq[byte]) {.async.} =
trace "Dtls write", length = msg.len()
var buf = msg var buf = msg
# TODO: exception catching try:
discard mb_ssl_write(self.ssl, buf) let write = mb_ssl_write(self.ssl, buf)
trace "Dtls write", msgLen = msg.len(), actuallyWrote = write
except MbedTLSError as exc:
trace "Dtls write error", errorMsg = exc.msg
raise exc
proc read*(self: DtlsConn): Future[seq[byte]] {.async.} = proc read*(self: DtlsConn): Future[seq[byte]] {.async.} =
var res = newSeq[byte](8192) var res = newSeq[byte](8192)
@ -97,7 +100,7 @@ proc read*(self: DtlsConn): Future[seq[byte]] {.async.} =
if length == MBEDTLS_ERR_SSL_WANT_READ: if length == MBEDTLS_ERR_SSL_WANT_READ:
continue continue
if length < 0: if length < 0:
trace "dtls read", error = $(length.mbedtls_high_level_strerr()) trace "dtls read", error = $(length.cint.mbedtls_high_level_strerr())
res.setLen(length) res.setLen(length)
return res return res
@ -164,47 +167,18 @@ proc stop*(self: Dtls) =
self.readLoop.cancel() self.readLoop.cancel()
self.started = false self.started = false
proc serverHandshake(self: DtlsConn) {.async.} = proc dtlsHandshake(self: DtlsConn, isServer: bool) {.async.} =
var shouldRead = true var shouldRead = isServer
while self.ssl.private_state != MBEDTLS_SSL_HANDSHAKE_OVER: while self.ssl.private_state != MBEDTLS_SSL_HANDSHAKE_OVER:
if shouldRead: if shouldRead:
case self.raddr.family if isServer:
of AddressFamily.IPv4: case self.raddr.family
mb_ssl_set_client_transport_id(self.ssl, self.raddr.address_v4) of AddressFamily.IPv4:
of AddressFamily.IPv6: mb_ssl_set_client_transport_id(self.ssl, self.raddr.address_v4)
mb_ssl_set_client_transport_id(self.ssl, self.raddr.address_v6) of AddressFamily.IPv6:
else: mb_ssl_set_client_transport_id(self.ssl, self.raddr.address_v6)
raise newException(DtlsError, "Remote address isn't an IP address") else:
let tmp = await self.dataRecv.popFirst() raise newException(DtlsError, "Remote address isn't an IP address")
self.dataRecv.addFirstNoWait(tmp)
self.sendFuture = nil
let res = mb_ssl_handshake_step(self.ssl)
if not self.sendFuture.isNil(): await self.sendFuture
shouldRead = false
if res == MBEDTLS_ERR_SSL_WANT_WRITE:
continue
elif res == MBEDTLS_ERR_SSL_WANT_READ or
self.ssl.private_state == MBEDTLS_SSL_CLIENT_KEY_EXCHANGE:
shouldRead = true
continue
elif res == MBEDTLS_ERR_SSL_HELLO_VERIFY_REQUIRED:
mb_ssl_session_reset(self.ssl)
shouldRead = true
continue
elif res != 0:
raise newException(DtlsError, $(res.mbedtls_high_level_strerr()))
proc clientHandshake(self: DtlsConn) {.async.} =
var shouldRead = false
while self.ssl.private_state != MBEDTLS_SSL_HANDSHAKE_OVER:
if shouldRead:
case self.raddr.family
of AddressFamily.IPv4:
mb_ssl_set_client_transport_id(self.ssl, self.raddr.address_v4)
of AddressFamily.IPv6:
mb_ssl_set_client_transport_id(self.ssl, self.raddr.address_v6)
else:
raise newException(DtlsError, "Remote address isn't an IP address")
let tmp = await self.dataRecv.popFirst() let tmp = await self.dataRecv.popFirst()
self.dataRecv.addFirstNoWait(tmp) self.dataRecv.addFirstNoWait(tmp)
self.sendFuture = nil self.sendFuture = nil
@ -214,8 +188,6 @@ proc clientHandshake(self: DtlsConn) {.async.} =
if res == MBEDTLS_ERR_SSL_WANT_WRITE: if res == MBEDTLS_ERR_SSL_WANT_WRITE:
continue continue
elif res == MBEDTLS_ERR_SSL_WANT_READ: elif res == MBEDTLS_ERR_SSL_WANT_READ:
# or self.ssl.private_state == MBEDTLS_SSL_SERVER_KEY_EXCHANGE:
# TODO: Might need to check directly on mbedtls C source
shouldRead = true shouldRead = true
continue continue
elif res == MBEDTLS_ERR_SSL_HELLO_VERIFY_REQUIRED: elif res == MBEDTLS_ERR_SSL_HELLO_VERIFY_REQUIRED:
@ -284,7 +256,7 @@ proc accept*(self: Dtls): Future[DtlsConn] {.async.} =
res.raddr = raddr res.raddr = raddr
res.dataRecv.addLastNoWait(buf) res.dataRecv.addLastNoWait(buf)
self.connections[raddr] = res self.connections[raddr] = res
await res.serverHandshake() await res.dtlsHandshake(true)
break break
except CatchableError as exc: except CatchableError as exc:
trace "Handshake fail", remoteAddress = raddr, error = exc.msg trace "Handshake fail", remoteAddress = raddr, error = exc.msg
@ -301,6 +273,9 @@ proc connect*(self: Dtls, raddr: TransportAddress): Future[DtlsConn] {.async.} =
mb_ssl_init(res.ssl) mb_ssl_init(res.ssl)
mb_ssl_config_init(res.config) mb_ssl_config_init(res.config)
res.ctr_drbg = self.ctr_drbg
res.entropy = self.entropy
var pkey = res.ctr_drbg.generateKey() var pkey = res.ctr_drbg.generateKey()
var srvcert = res.ctr_drbg.generateCertificate(pkey) var srvcert = res.ctr_drbg.generateCertificate(pkey)
res.localCert = newSeq[byte](srvcert.raw.len) res.localCert = newSeq[byte](srvcert.raw.len)
@ -321,14 +296,13 @@ proc connect*(self: Dtls, raddr: TransportAddress): Future[DtlsConn] {.async.} =
mb_ssl_setup(res.ssl, res.config) mb_ssl_setup(res.ssl, res.config)
mb_ssl_set_verify(res.ssl, verify, res) mb_ssl_set_verify(res.ssl, verify, res)
mb_ssl_conf_authmode(res.config, MBEDTLS_SSL_VERIFY_OPTIONAL) mb_ssl_conf_authmode(res.config, MBEDTLS_SSL_VERIFY_OPTIONAL)
mb_ssl_set_bio(res.ssl, cast[pointer](res), mb_ssl_set_bio(res.ssl, cast[pointer](res), dtlsSend, dtlsRecv, nil)
dtlsSend, dtlsRecv, nil)
res.raddr = raddr res.raddr = raddr
self.connections[raddr] = res self.connections[raddr] = res
try: try:
await res.clientHandshake() await res.dtlsHandshake(false)
except CatchableError as exc: except CatchableError as exc:
trace "Handshake fail", remoteAddress = raddr, error = exc.msg trace "Handshake fail", remoteAddress = raddr, error = exc.msg
self.connections.del(raddr) self.connections.del(raddr)

View File

@ -64,7 +64,6 @@ type
sockServer: ptr socket sockServer: ptr socket
pendingConnections: seq[SctpConn] pendingConnections: seq[SctpConn]
pendingConnections2: Table[SockAddr, SctpConn] pendingConnections2: Table[SockAddr, SctpConn]
sentConnection: SctpConn
sentAddress: TransportAddress sentAddress: TransportAddress
sentFuture: Future[void] sentFuture: Future[void]
@ -161,24 +160,23 @@ proc write*(
sendParams = default(SctpMessageParameters), sendParams = default(SctpMessageParameters),
) {.async.} = ) {.async.} =
trace "Write", buf, sctp = cast[uint64](self), sock = cast[uint64](self.sctpSocket) trace "Write", buf, sctp = cast[uint64](self), sock = cast[uint64](self.sctpSocket)
self.sctp.sentConnection = self
self.sctp.sentAddress = self.address self.sctp.sentAddress = self.address
var cpy = buf var cpy = buf
var let sendvErr =
(sendInfo, infoType) = if sendParams == default(SctpMessageParameters):
if sendParams != default(SctpMessageParameters): self.usrsctpAwait:
(sctp_sndinfo( self.sctpSocket.usrsctp_sendv(cast[pointer](addr cpy[0]), cpy.len().uint, nil, 0,
snd_sid: sendParams.streamId, nil, 0, SCTP_SENDV_NOINFO.cuint, 0)
snd_ppid: sendParams.protocolId.swapBytes(), else:
snd_flags: sendParams.toFlags let sendInfo = sctp_sndinfo(
), cuint(SCTP_SENDV_SNDINFO)) snd_sid: sendParams.streamId,
else: snd_ppid: sendParams.protocolId.swapBytes(),
(default(sctp_sndinfo), cuint(SCTP_SENDV_NOINFO)) snd_flags: sendParams.toFlags)
sendvErr = self.usrsctpAwait: self.usrsctpAwait:
self.sctpSocket.usrsctp_sendv(cast[pointer](addr cpy[0]), cpy.len.uint, nil, 0, self.sctpSocket.usrsctp_sendv(cast[pointer](addr cpy[0]), cpy.len().uint, nil, 0,
cast[pointer](addr sendInfo), sizeof(sendInfo).SockLen, cast[pointer](addr sendInfo), sizeof(sendInfo).SockLen,
infoType, 0) SCTP_SENDV_SNDINFO.cuint, 0)
if sendvErr < 0: if sendvErr < 0:
perror("usrsctp_sendv") # TODO: throw an exception perror("usrsctp_sendv") # TODO: throw an exception
trace "write sendv error?", sendvErr, sendParams trace "write sendv error?", sendvErr, sendParams
@ -194,7 +192,7 @@ proc handleUpcall(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} =
conn = cast[SctpConn](data) conn = cast[SctpConn](data)
events = usrsctp_get_events(sock) events = usrsctp_get_events(sock)
trace "Handle Upcall", events trace "Handle Upcall", events, state = conn.state
if conn.state == Connecting: if conn.state == Connecting:
if bitand(events, SCTP_EVENT_ERROR) != 0: if bitand(events, SCTP_EVENT_ERROR) != 0:
warn "Cannot connect", address = conn.address warn "Cannot connect", address = conn.address
@ -202,7 +200,8 @@ proc handleUpcall(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} =
elif bitand(events, SCTP_EVENT_WRITE) != 0: elif bitand(events, SCTP_EVENT_WRITE) != 0:
conn.state = Connected conn.state = Connected
conn.connectEvent.fire() conn.connectEvent.fire()
elif bitand(events, SCTP_EVENT_READ) != 0:
if bitand(events, SCTP_EVENT_READ) != 0:
var var
message = SctpMessage( message = SctpMessage(
data: newSeq[byte](4096) data: newSeq[byte](4096)
@ -253,12 +252,12 @@ proc handleAccept(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} =
sctp = cast[Sctp](data) sctp = cast[Sctp](data)
sctpSocket = usrsctp_accept(sctp.sockServer, cast[ptr SockAddr](addr sconn), addr slen) sctpSocket = usrsctp_accept(sctp.sockServer, cast[ptr SockAddr](addr sconn), addr slen)
doAssert 0 == sctpSocket.usrsctp_set_non_blocking(1)
let conn = cast[SctpConn](sconn.sconn_addr) let conn = cast[SctpConn](sconn.sconn_addr)
conn.sctpSocket = sctpSocket conn.sctpSocket = sctpSocket
conn.state = Connected conn.state = Connected
var nodelay: uint32 = 1 var nodelay: uint32 = 1
var recvinfo: uint32 = 1 var recvinfo: uint32 = 1
doAssert 0 == sctpSocket.usrsctp_set_non_blocking(1)
doAssert 0 == conn.sctpSocket.usrsctp_set_upcall(handleUpcall, cast[pointer](conn)) doAssert 0 == conn.sctpSocket.usrsctp_set_upcall(handleUpcall, cast[pointer](conn))
doAssert 0 == conn.sctpSocket.usrsctp_setsockopt(IPPROTO_SCTP, SCTP_NODELAY, doAssert 0 == conn.sctpSocket.usrsctp_setsockopt(IPPROTO_SCTP, SCTP_NODELAY,
addr nodelay, sizeof(nodelay).SockLen) addr nodelay, sizeof(nodelay).SockLen)
@ -266,36 +265,6 @@ proc handleAccept(sock: ptr socket, data: pointer, flags: cint) {.cdecl.} =
addr recvinfo, sizeof(recvinfo).SockLen) addr recvinfo, sizeof(recvinfo).SockLen)
conn.acceptEvent.fire() conn.acceptEvent.fire()
# proc getOrCreateConnection(self: Sctp,
# udp: DatagramTransport,
# address: TransportAddress,
# sctpPort: uint16 = 5000): Future[SctpConn] {.async.} =
# #TODO remove the = 5000
# if self.connections.hasKey(address):
# return self.connections[address]
# trace "Create Connection", address
# let
# sctpSocket = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil)
# conn = SctpConn.new(self, udp, address, sctpSocket)
# var on: int = 1
# doAssert 0 == conn.sctpSocket.usrsctp_setsockopt(IPPROTO_SCTP,
# SCTP_RECVRCVINFO,
# addr on,
# sizeof(on).SockLen)
# doAssert 0 == usrsctp_set_non_blocking(conn.sctpSocket, 1)
# doAssert 0 == usrsctp_set_upcall(conn.sctpSocket, handleUpcall, cast[pointer](conn))
# var sconn: Sockaddr_conn
# sconn.sconn_family = AF_CONN
# sconn.sconn_port = htons(sctpPort)
# sconn.sconn_addr = cast[pointer](self)
# self.sentConnection = conn
# self.sentAddress = address
# let connErr = self.usrsctpAwait:
# conn.sctpSocket.usrsctp_connect(cast[ptr SockAddr](addr sconn), SockLen(sizeof(sconn)))
# doAssert 0 == connErr or errno == posix.EINPROGRESS, ($errno)
# self.connections[address] = conn
# return conn
proc sendCallback(ctx: pointer, proc sendCallback(ctx: pointer,
buffer: pointer, buffer: pointer,
length: uint, length: uint,
@ -310,6 +279,7 @@ proc sendCallback(ctx: pointer,
proc testSend() {.async.} = proc testSend() {.async.} =
try: try:
trace "Send To", address = sctpConn.address trace "Send To", address = sctpConn.address
# printSctpPacket(buf)
# TODO: defined it printSctpPacket(buf) # TODO: defined it printSctpPacket(buf)
await sctpConn.conn.write(buf) await sctpConn.conn.write(buf)
except CatchableError as exc: except CatchableError as exc:
@ -343,43 +313,6 @@ proc new*(T: typedesc[Sctp], dtls: Dtls, laddr: TransportAddress): T =
usrsctp_register_address(cast[pointer](sctp)) usrsctp_register_address(cast[pointer](sctp))
return sctp return sctp
#proc new*(T: typedesc[Sctp], port: uint16 = 9899): T =
# logScope: topics = "webrtc sctp"
# let sctp = T(gotConnection: newAsyncEvent())
# proc onReceive(udp: DatagramTransport, raddr: TransportAddress) {.async, gcsafe.} =
# let
# msg = udp.getMessage()
# data = usrsctp_dumppacket(unsafeAddr msg[0], uint(msg.len), SCTP_DUMP_INBOUND)
# if data != nil:
# if sctp.isServer:
# trace "onReceive (server)", data = data.packetPretty(), length = msg.len(), raddr
# else:
# trace "onReceive (client)", data = data.packetPretty(), length = msg.len(), raddr
# usrsctp_freedumpbuffer(data)
#
# if sctp.isServer:
# sctp.sentAddress = raddr
# usrsctp_conninput(cast[pointer](sctp), unsafeAddr msg[0], uint(msg.len), 0)
# else:
# let conn = await sctp.getOrCreateConnection(udp, raddr)
# sctp.sentConnection = conn
# sctp.sentAddress = raddr
# usrsctp_conninput(cast[pointer](sctp), unsafeAddr msg[0], uint(msg.len), 0)
# let
# localAddr = TransportAddress(family: AddressFamily.IPv4, port: Port(port))
# laddr = initTAddress("127.0.0.1:" & $port)
# udp = newDatagramTransport(onReceive, local = laddr)
# trace "local address", localAddr, laddr
# sctp.udp = udp
# sctp.timersHandler = timersHandler()
#
# usrsctp_init_nothreads(port, sendCallback, printf)
# discard usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_NONE)
# discard usrsctp_sysctl_set_sctp_ecn_enable(1)
# usrsctp_register_address(cast[pointer](sctp))
#
# return sctp
proc stop*(self: Sctp) {.async.} = proc stop*(self: Sctp) {.async.} =
discard self.usrsctpAwait usrsctp_finish() discard self.usrsctpAwait usrsctp_finish()
self.udp.close() self.udp.close()
@ -392,7 +325,7 @@ proc readLoopProc(res: SctpConn) {.async.} =
if not data.isNil(): if not data.isNil():
trace "Receive data", remoteAddress = res.conn.raddr, data = data.packetPretty() trace "Receive data", remoteAddress = res.conn.raddr, data = data.packetPretty()
usrsctp_freedumpbuffer(data) usrsctp_freedumpbuffer(data)
res.sctp.sentConnection = res # printSctpPacket(msg) TODO: defined it
usrsctp_conninput(cast[pointer](res), unsafeAddr msg[0], uint(msg.len), 0) usrsctp_conninput(cast[pointer](res), unsafeAddr msg[0], uint(msg.len), 0)
proc accept*(self: Sctp): Future[SctpConn] {.async.} = proc accept*(self: Sctp): Future[SctpConn] {.async.} =
@ -431,19 +364,32 @@ proc connect*(self: Sctp,
sctpPort: uint16 = 5000): Future[SctpConn] {.async.} = sctpPort: uint16 = 5000): Future[SctpConn] {.async.} =
let let
sctpSocket = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil) sctpSocket = usrsctp_socket(AF_CONN, posix.SOCK_STREAM, IPPROTO_SCTP, nil, nil, 0, nil)
res = SctpConn.new(await self.dtls.connect(address), self) conn = SctpConn.new(await self.dtls.connect(address), self)
#usrsctp_register_address(cast[pointer](res)) trace "Create Connection", address
conn.sctpSocket = sctpSocket
# trace "Connect", address, sctpPort conn.state = Connected
# let conn = await self.getOrCreateConnection(self.udp, address, sctpPort) var nodelay: uint32 = 1
# if conn.state == Connected: var recvinfo: uint32 = 1
# return conn doAssert 0 == usrsctp_set_non_blocking(conn.sctpSocket, 1)
# try: doAssert 0 == usrsctp_set_upcall(conn.sctpSocket, handleUpcall, cast[pointer](conn))
# await conn.connectEvent.wait() # TODO: clear? doAssert 0 == conn.sctpSocket.usrsctp_setsockopt(IPPROTO_SCTP, SCTP_NODELAY,
# except CancelledError as exc: addr nodelay, sizeof(nodelay).SockLen)
# conn.sctpSocket.usrsctp_close() doAssert 0 == conn.sctpSocket.usrsctp_setsockopt(IPPROTO_SCTP, SCTP_RECVRCVINFO,
# return nil addr recvinfo, sizeof(recvinfo).SockLen)
# if conn.state != Connected: var sconn: Sockaddr_conn
# raise newSctpError("Cannot connect to " & $address) sconn.sconn_family = AF_CONN
# return conn sconn.sconn_port = htons(sctpPort)
sconn.sconn_addr = cast[pointer](conn)
self.sentAddress = address
usrsctp_register_address(cast[pointer](conn))
conn.readLoop = conn.readLoopProc()
let connErr = self.usrsctpAwait:
conn.sctpSocket.usrsctp_connect(cast[ptr SockAddr](addr sconn), SockLen(sizeof(sconn)))
doAssert 0 == connErr or errno == posix.EINPROGRESS, ($errno)
conn.state = Connecting
conn.connectEvent.clear()
await conn.connectEvent.wait()
# TODO: check connection state, if closed throw some exception I guess
self.connections[address] = conn
return conn

View File

@ -24,7 +24,7 @@ proc init*(self: UdpConn, laddr: TransportAddress) =
proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async, gcsafe.} = proc onReceive(udp: DatagramTransport, address: TransportAddress) {.async, gcsafe.} =
let msg = udp.getMessage() let msg = udp.getMessage()
echo "\e[33m<UDP>\e[0;1m onReceive\e[0m" trace "UDP onReceive", msg
self.dataRecv.addLastNoWait((msg, address)) self.dataRecv.addLastNoWait((msg, address))
self.dataRecv = newAsyncQueue[(seq[byte], TransportAddress)]() self.dataRecv = newAsyncQueue[(seq[byte], TransportAddress)]()
@ -34,9 +34,9 @@ proc close*(self: UdpConn) {.async.} =
self.udp.close() self.udp.close()
proc write*(self: UdpConn, raddr: TransportAddress, msg: seq[byte]) {.async.} = proc write*(self: UdpConn, raddr: TransportAddress, msg: seq[byte]) {.async.} =
echo "\e[33m<UDP>\e[0;1m write\e[0m" trace "UDP write", msg
await self.udp.sendTo(raddr, msg) await self.udp.sendTo(raddr, msg)
proc read*(self: UdpConn): Future[(seq[byte], TransportAddress)] {.async.} = proc read*(self: UdpConn): Future[(seq[byte], TransportAddress)] {.async.} =
echo "\e[33m<UDP>\e[0;1m read\e[0m" trace "UDP read"
return await self.dataRecv.popFirst() return await self.dataRecv.popFirst()