mirror of https://github.com/vacp2p/nim-webrtc.git
141 lines
4.6 KiB
Nim
141 lines
4.6 KiB
Nim
# Nim-WebRTC
|
|
# Copyright (c) 2024 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
# at your option.
|
|
# This file may not be copied, modified, or distributed except according to
|
|
# those terms.
|
|
|
|
{.used.}
|
|
|
|
import chronos
|
|
import options
|
|
import bearssl
|
|
import ../webrtc/udp_transport
|
|
import ../webrtc/stun/stun_connection
|
|
import ../webrtc/stun/stun_message
|
|
import ../webrtc/stun/stun_attributes
|
|
import ./asyncunit
|
|
|
|
proc newRng(): ref HmacDrbgContext =
|
|
HmacDrbgContext.new()
|
|
|
|
proc usernameProvEmpty(): string = ""
|
|
proc usernameProvTest(): string {.raises: [], gcsafe.} = "TestUsername"
|
|
proc usernameCheckTrue(username: seq[byte]): bool {.raises: [], gcsafe.} = true
|
|
proc usernameCheckFalse(username: seq[byte]): bool {.raises: [], gcsafe.} = false
|
|
proc passwordProvEmpty(username: seq[byte]): seq[byte] {.raises: [], gcsafe.} = @[]
|
|
proc passwordProvTest(username: seq[byte]): seq[byte] {.raises: [], gcsafe.} = @[1'u8, 2, 3, 4]
|
|
|
|
suite "Stun message encoding/decoding":
|
|
teardown:
|
|
checkLeaks()
|
|
|
|
asyncTest "Get BindingRequest + encode & decode with a set username":
|
|
var
|
|
udp = UdpTransport.new(AnyAddress)
|
|
conn = StunConn.new(
|
|
udp,
|
|
TransportAddress(AnyAddress),
|
|
iceControlling=true,
|
|
usernameProvider=usernameProvTest,
|
|
usernameChecker=usernameCheckTrue,
|
|
passwordProvider=passwordProvEmpty,
|
|
newRng()
|
|
)
|
|
msg = conn.getBindingRequest()
|
|
encoded = msg.encode(@[1'u8, 2, 3, 4])
|
|
decoded = StunMessage.decode(encoded)
|
|
messageIntegrity = decoded.attributes[^2]
|
|
fingerprint = decoded.attributes[^1]
|
|
|
|
decoded.attributes = decoded.attributes[0 ..< ^2]
|
|
check:
|
|
decoded == msg
|
|
messageIntegrity.attributeType == AttrMessageIntegrity.uint16
|
|
fingerprint.attributeType == AttrFingerprint.uint16
|
|
await conn.close()
|
|
await udp.close()
|
|
|
|
asyncTest "Get BindingResponse from BindingRequest + encode & decode":
|
|
var
|
|
udp = UdpTransport.new(AnyAddress)
|
|
conn = StunConn.new(
|
|
udp,
|
|
TransportAddress(AnyAddress),
|
|
iceControlling=false,
|
|
usernameProvider=usernameProvTest,
|
|
usernameChecker=usernameCheckTrue,
|
|
passwordProvider=passwordProvEmpty,
|
|
newRng()
|
|
)
|
|
bindingRequest = conn.getBindingRequest()
|
|
bindingResponse = conn.getBindingResponse(bindingRequest)
|
|
encoded = bindingResponse.encode(@[1'u8, 2, 3, 4])
|
|
decoded = StunMessage.decode(encoded)
|
|
messageIntegrity = decoded.attributes[^2]
|
|
fingerprint = decoded.attributes[^1]
|
|
|
|
decoded.attributes = decoded.attributes[0 ..< ^2]
|
|
check:
|
|
bindingResponse == decoded
|
|
messageIntegrity.attributeType == AttrMessageIntegrity.uint16
|
|
fingerprint.attributeType == AttrFingerprint.uint16
|
|
await conn.close()
|
|
await udp.close()
|
|
|
|
suite "Stun checkForError":
|
|
teardown:
|
|
checkLeaks()
|
|
|
|
asyncTest "checkForError: Missing MessageIntegrity or Username":
|
|
var
|
|
udp = UdpTransport.new(AnyAddress)
|
|
conn = StunConn.new(
|
|
udp,
|
|
TransportAddress(AnyAddress),
|
|
iceControlling=false,
|
|
usernameProvider=usernameProvEmpty, # Use of an empty username provider
|
|
usernameChecker=usernameCheckTrue,
|
|
passwordProvider=passwordProvEmpty,
|
|
newRng()
|
|
)
|
|
bindingRequest = conn.getBindingRequest()
|
|
errorMissMessageIntegrity = conn.checkForError(bindingRequest).get()
|
|
|
|
check:
|
|
errorMissMessageIntegrity.getAttribute(ErrorCode).get().getErrorCode() == ECBadRequest
|
|
|
|
let
|
|
encoded = bindingRequest.encode(@[1'u8, 2, 3, 4]) # adds MessageIntegrity
|
|
decoded = StunMessage.decode(encoded)
|
|
errorMissUsername = conn.checkForError(decoded).get()
|
|
|
|
check:
|
|
errorMissUsername.getAttribute(ErrorCode).get().getErrorCode() == ECBadRequest
|
|
await conn.close()
|
|
await udp.close()
|
|
|
|
asyncTest "checkForError: UsernameChecker returns false":
|
|
var
|
|
udp = UdpTransport.new(AnyAddress)
|
|
conn = StunConn.new(
|
|
udp,
|
|
TransportAddress(AnyAddress),
|
|
iceControlling=false,
|
|
usernameProvider=usernameProvTest,
|
|
usernameChecker=usernameCheckFalse, # Username provider returns false
|
|
passwordProvider=passwordProvEmpty,
|
|
newRng()
|
|
)
|
|
bindingRequest = conn.getBindingRequest()
|
|
encoded = bindingRequest.encode(@[0'u8, 1, 2, 3])
|
|
decoded = StunMessage.decode(encoded)
|
|
error = conn.checkForError(decoded).get()
|
|
|
|
check:
|
|
error.getAttribute(ErrorCode).get().getErrorCode() == ECUnauthorized
|
|
await conn.close()
|
|
await udp.close()
|