feat: add a proper tracker management (#17)

* feat: add a proper tracker management

* chore: remove newline

* fix: use closeWait instead of close

* chore: replace custom made checkTrackers by chronos checkLeaks

* chore: renamed UdpTransport.stop into close

* chore: remove flakyAsyncTest
This commit is contained in:
Ludovic Chenut 2024-07-05 13:55:36 +02:00 committed by GitHub
parent 11111e6fc7
commit 81b91e32a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 35 additions and 87 deletions

View File

@ -1,6 +1,7 @@
import unittest2, chronos import unittest2, chronos
import chronos/unittest2/asynctests
export unittest2, chronos export unittest2, chronos, asynctests
template asyncTeardown*(body: untyped): untyped = template asyncTeardown*(body: untyped): untyped =
teardown: teardown:
@ -15,32 +16,3 @@ template asyncSetup*(body: untyped): untyped =
proc() {.async, gcsafe.} = proc() {.async, gcsafe.} =
body body
)()) )())
template asyncTest*(name: string, body: untyped): untyped =
test name:
waitFor((
proc() {.async, gcsafe.} =
body
)())
template flakyAsyncTest*(name: string, attempts: int, body: untyped): untyped =
test name:
var attemptNumber = 0
while attemptNumber < attempts:
let isLastAttempt = attemptNumber == attempts - 1
inc attemptNumber
try:
waitFor((
proc() {.async, gcsafe.} =
body
)())
except Exception as e:
if isLastAttempt: raise e
else: testStatusIMPL = TestStatus.FAILED
finally:
if not isLastAttempt:
if testStatusIMPL == TestStatus.FAILED:
# Retry
testStatusIMPL = TestStatus.OK
else:
break

View File

@ -1,48 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import chronos
import unittest2
export unittest2
const
StreamTransportTrackerName = "stream.transport"
StreamServerTrackerName = "stream.server"
DgramTransportTrackerName = "datagram.transport"
trackerNames = [
StreamTransportTrackerName,
StreamServerTrackerName,
DgramTransportTrackerName,
]
template asyncTest*(name: string, body: untyped): untyped =
test name:
waitFor((proc () {.async, gcsafe.} = body)())
iterator testTrackers*(extras: openArray[string] = []): TrackerBase =
for name in trackerNames:
let t = getTracker(name)
if not isNil(t): yield t
for name in extras:
let t = getTracker(name)
if not isNil(t): yield t
template checkTracker*(name: string) =
var tracker = getTracker(name)
if tracker.isLeaked():
checkpoint tracker.dump()
fail()
template checkTrackers*() =
for tracker in testTrackers():
if tracker.isLeaked():
checkpoint tracker.dump()
fail()
# Also test the GC is not fooling with us
try:
GC_fullCollect()
except:
discard

View File

@ -9,6 +9,7 @@
{.used.} {.used.}
import chronos
import options import options
import bearssl import bearssl
import ../webrtc/udp_transport import ../webrtc/udp_transport
@ -28,7 +29,10 @@ proc passwordProvEmpty(username: seq[byte]): seq[byte] {.raises: [], gcsafe.} =
proc passwordProvTest(username: seq[byte]): seq[byte] {.raises: [], gcsafe.} = @[1'u8, 2, 3, 4] proc passwordProvTest(username: seq[byte]): seq[byte] {.raises: [], gcsafe.} = @[1'u8, 2, 3, 4]
suite "Stun message encoding/decoding": suite "Stun message encoding/decoding":
test "Get BindingRequest + encode & decode with a set username": teardown:
checkLeaks()
asyncTest "Get BindingRequest + encode & decode with a set username":
var var
udp = UdpTransport.new(AnyAddress) udp = UdpTransport.new(AnyAddress)
conn = StunConn.new( conn = StunConn.new(
@ -52,8 +56,9 @@ suite "Stun message encoding/decoding":
messageIntegrity.attributeType == AttrMessageIntegrity.uint16 messageIntegrity.attributeType == AttrMessageIntegrity.uint16
fingerprint.attributeType == AttrFingerprint.uint16 fingerprint.attributeType == AttrFingerprint.uint16
conn.close() conn.close()
await udp.close()
test "Get BindingResponse from BindingRequest + encode & decode": asyncTest "Get BindingResponse from BindingRequest + encode & decode":
var var
udp = UdpTransport.new(AnyAddress) udp = UdpTransport.new(AnyAddress)
conn = StunConn.new( conn = StunConn.new(
@ -77,9 +82,14 @@ suite "Stun message encoding/decoding":
bindingResponse == decoded bindingResponse == decoded
messageIntegrity.attributeType == AttrMessageIntegrity.uint16 messageIntegrity.attributeType == AttrMessageIntegrity.uint16
fingerprint.attributeType == AttrFingerprint.uint16 fingerprint.attributeType == AttrFingerprint.uint16
conn.close()
await udp.close()
suite "Stun checkForError": suite "Stun checkForError":
test "checkForError: Missing MessageIntegrity or Username": teardown:
checkLeaks()
asyncTest "checkForError: Missing MessageIntegrity or Username":
var var
udp = UdpTransport.new(AnyAddress) udp = UdpTransport.new(AnyAddress)
conn = StunConn.new( conn = StunConn.new(
@ -104,8 +114,10 @@ suite "Stun checkForError":
check: check:
errorMissUsername.getAttribute(ErrorCode).get().getErrorCode() == ECBadRequest errorMissUsername.getAttribute(ErrorCode).get().getErrorCode() == ECBadRequest
conn.close()
await udp.close()
test "checkForError: UsernameChecker returns false": asyncTest "checkForError: UsernameChecker returns false":
var var
udp = UdpTransport.new(AnyAddress) udp = UdpTransport.new(AnyAddress)
conn = StunConn.new( conn = StunConn.new(
@ -124,3 +136,5 @@ suite "Stun checkForError":
check: check:
error.getAttribute(ErrorCode).get().getErrorCode() == ECUnauthorized error.getAttribute(ErrorCode).get().getErrorCode() == ECUnauthorized
conn.close()
await udp.close()

View File

@ -18,6 +18,7 @@ logScope:
# - Need to implement ICE-CONTROLL(ED|ING) for browser to browser (not critical) # - Need to implement ICE-CONTROLL(ED|ING) for browser to browser (not critical)
const const
StunConnectionTracker* = "webrtc.stun.connection"
StunMaxQueuingMessages = 1024 StunMaxQueuingMessages = 1024
StunBindingRequest* = 0x0001'u16 StunBindingRequest* = 0x0001'u16
StunBindingResponse* = 0x0101'u16 StunBindingResponse* = 0x0101'u16
@ -211,6 +212,7 @@ proc new*(
rng: rng rng: rng
) )
self.handlesFut = self.stunMessageHandler() self.handlesFut = self.stunMessageHandler()
trackCounter(StunConnectionTracker)
return self return self
proc join*(self: StunConn) {.async: (raises: [CancelledError]).} = proc join*(self: StunConn) {.async: (raises: [CancelledError]).} =
@ -227,6 +229,7 @@ proc close*(self: StunConn) =
self.closeEvent.fire() self.closeEvent.fire()
self.handlesFut.cancelSoon() self.handlesFut.cancelSoon()
self.closed = true self.closed = true
untrackCounter(StunConnectionTracker)
proc write*( proc write*(
self: StunConn, self: StunConn,

View File

@ -15,6 +15,7 @@ logScope:
topics = "webrtc stun stun_transport" topics = "webrtc stun stun_transport"
const const
StunTransportTracker* = "webrtc.stun.transport"
StunMaxPendingConnections = 512 StunMaxPendingConnections = 512
type type
@ -89,6 +90,7 @@ proc stop(self: Stun) =
for conn in self.connections.values(): for conn in self.connections.values():
conn.close() conn.close()
self.readingLoop.cancelSoon() self.readingLoop.cancelSoon()
untrackCounter(StunTransportTracker)
proc defaultUsernameProvider(): string = "" proc defaultUsernameProvider(): string = ""
proc defaultUsernameChecker(username: seq[byte]): bool = true proc defaultUsernameChecker(username: seq[byte]): bool = true
@ -113,4 +115,5 @@ proc new*(
) )
self.readingLoop = stunReadLoop() self.readingLoop = stunReadLoop()
self.pendingConn = newAsyncQueue[StunConn](StunMaxPendingConnections) self.pendingConn = newAsyncQueue[StunConn](StunMaxPendingConnections)
trackCounter(StunTransportTracker)
return self return self

View File

@ -28,6 +28,8 @@ type
dataRecv: AsyncQueue[UdpPacketInfo] dataRecv: AsyncQueue[UdpPacketInfo]
closed: bool closed: bool
const UdpTransportTrackerName* = "webrtc.udp.transport"
proc new*(T: type UdpTransport, laddr: TransportAddress): T = proc new*(T: type UdpTransport, laddr: TransportAddress): T =
## Initialize an Udp Transport ## Initialize an Udp Transport
## ##
@ -47,16 +49,18 @@ proc new*(T: type UdpTransport, laddr: TransportAddress): T =
self.dataRecv = newAsyncQueue[UdpPacketInfo]() self.dataRecv = newAsyncQueue[UdpPacketInfo]()
self.udp = newDatagramTransport(onReceive, local = laddr) self.udp = newDatagramTransport(onReceive, local = laddr)
trackCounter(UdpTransportTrackerName)
return self return self
proc close*(self: UdpTransport) = proc close*(self: UdpTransport) {.async: (raises: []).} =
## Close an Udp Transport ## Close an Udp Transport
## ##
if self.closed: if self.closed:
debug "Trying to close an already closed UdpConn" debug "Trying to stop an already stopped UdpTransport"
return return
self.closed = true self.closed = true
self.udp.close() await self.udp.closeWait()
untrackCounter(UdpTransportTrackerName)
proc write*( proc write*(
self: UdpTransport, self: UdpTransport,
@ -66,7 +70,7 @@ proc write*(
## Write a message on Udp to a remote address `raddr` ## Write a message on Udp to a remote address `raddr`
## ##
if self.closed: if self.closed:
debug "Try to write on an already closed UdpConn" debug "Try to write on an already closed UdpTransport"
return return
trace "UDP write", msg trace "UDP write", msg
try: try:
@ -79,7 +83,7 @@ proc read*(self: UdpTransport): Future[UdpPacketInfo] {.async: (raises: [Cancell
## Read the next received Udp message ## Read the next received Udp message
## ##
if self.closed: if self.closed:
debug "Try to read on an already closed UdpConn" debug "Try to read on an already closed UdpTransport"
return return
trace "UDP read" trace "UDP read"
return await self.dataRecv.popFirst() return await self.dataRecv.popFirst()