614 lines
15 KiB
Go
Raw Normal View History

2022-03-10 10:44:48 +01:00
// Package client implements the API for a TURN client
package client
import (
"errors"
"fmt"
"io"
"math"
"net"
"sync"
"time"
"github.com/pion/logging"
"github.com/pion/stun"
"github.com/pion/turn/v2/internal/proto"
)
const (
maxReadQueueSize = 1024
permRefreshInterval = 120 * time.Second
maxRetryAttempts = 3
)
const (
timerIDRefreshAlloc int = iota
timerIDRefreshPerms
)
func noDeadline() time.Time {
return time.Time{}
}
type inboundData struct {
data []byte
from net.Addr
}
// UDPConnObserver is an interface to UDPConn observer
type UDPConnObserver interface {
TURNServerAddr() net.Addr
Username() stun.Username
Realm() stun.Realm
WriteTo(data []byte, to net.Addr) (int, error)
PerformTransaction(msg *stun.Message, to net.Addr, dontWait bool) (TransactionResult, error)
OnDeallocated(relayedAddr net.Addr)
}
// UDPConnConfig is a set of configuration params use by NewUDPConn
type UDPConnConfig struct {
Observer UDPConnObserver
RelayedAddr net.Addr
Integrity stun.MessageIntegrity
Nonce stun.Nonce
Lifetime time.Duration
Log logging.LeveledLogger
}
// UDPConn is the implementation of the Conn and PacketConn interfaces for UDP network connections.
// comatible with net.PacketConn and net.Conn
type UDPConn struct {
obs UDPConnObserver // read-only
relayedAddr net.Addr // read-only
permMap *permissionMap // thread-safe
bindingMgr *bindingManager // thread-safe
integrity stun.MessageIntegrity // read-only
_nonce stun.Nonce // needs mutex x
_lifetime time.Duration // needs mutex x
readCh chan *inboundData // thread-safe
closeCh chan struct{} // thread-safe
readTimer *time.Timer // thread-safe
refreshAllocTimer *PeriodicTimer // thread-safe
refreshPermsTimer *PeriodicTimer // thread-safe
mutex sync.RWMutex // thread-safe
log logging.LeveledLogger // read-only
}
// NewUDPConn creates a new instance of UDPConn
func NewUDPConn(config *UDPConnConfig) *UDPConn {
c := &UDPConn{
obs: config.Observer,
relayedAddr: config.RelayedAddr,
permMap: newPermissionMap(),
bindingMgr: newBindingManager(),
integrity: config.Integrity,
_nonce: config.Nonce,
_lifetime: config.Lifetime,
readCh: make(chan *inboundData, maxReadQueueSize),
closeCh: make(chan struct{}),
readTimer: time.NewTimer(time.Duration(math.MaxInt64)),
log: config.Log,
}
c.log.Debugf("initial lifetime: %d seconds", int(c.lifetime().Seconds()))
c.refreshAllocTimer = NewPeriodicTimer(
timerIDRefreshAlloc,
c.onRefreshTimers,
c.lifetime()/2,
)
c.refreshPermsTimer = NewPeriodicTimer(
timerIDRefreshPerms,
c.onRefreshTimers,
permRefreshInterval,
)
if c.refreshAllocTimer.Start() {
c.log.Debugf("refreshAllocTimer started")
}
if c.refreshPermsTimer.Start() {
c.log.Debugf("refreshPermsTimer started")
}
return c
}
// ReadFrom reads a packet from the connection,
// copying the payload into p. It returns the number of
// bytes copied into p and the return address that
// was on the packet.
// It returns the number of bytes read (0 <= n <= len(p))
// and any error encountered. Callers should always process
// the n > 0 bytes returned before considering the error err.
// ReadFrom can be made to time out and return
// an Error with Timeout() == true after a fixed time limit;
// see SetDeadline and SetReadDeadline.
func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
for {
select {
case ibData := <-c.readCh:
n := copy(p, ibData.data)
if n < len(ibData.data) {
return 0, nil, io.ErrShortBuffer
}
return n, ibData.from, nil
case <-c.readTimer.C:
return 0, nil, &net.OpError{
Op: "read",
Net: c.LocalAddr().Network(),
Addr: c.LocalAddr(),
Err: newTimeoutError("i/o timeout"),
}
case <-c.closeCh:
return 0, nil, &net.OpError{
Op: "read",
Net: c.LocalAddr().Network(),
Addr: c.LocalAddr(),
Err: errClosed,
}
}
}
}
// WriteTo writes a packet with payload p to addr.
// WriteTo can be made to time out and return
// an Error with Timeout() == true after a fixed time limit;
// see SetDeadline and SetWriteDeadline.
// On packet-oriented connections, write timeouts are rare.
func (c *UDPConn) WriteTo(p []byte, addr net.Addr) (int, error) { //nolint: gocognit
var err error
_, ok := addr.(*net.UDPAddr)
if !ok {
return 0, errUDPAddrCast
}
// check if we have a permission for the destination IP addr
perm, ok := c.permMap.find(addr)
if !ok {
perm = &permission{}
c.permMap.insert(addr, perm)
}
// This func-block would block, per destination IP (, or perm), until
// the perm state becomes "requested". Purpose of this is to guarantee
// the order of packets (within the same perm).
// Note that CreatePermission transaction may not be complete before
// all the data transmission. This is done assuming that the request
// will be mostly likely successful and we can tolerate some loss of
// UDP packet (or reorder), inorder to minimize the latency in most cases.
createPermission := func() error {
perm.mutex.Lock()
defer perm.mutex.Unlock()
if perm.state() == permStateIdle {
// punch a hole! (this would block a bit..)
if err = c.createPermissions(addr); err != nil {
c.permMap.delete(addr)
return err
}
perm.setState(permStatePermitted)
}
return nil
}
for i := 0; i < maxRetryAttempts; i++ {
if err = createPermission(); !errors.Is(err, errTryAgain) {
break
}
}
if err != nil {
return 0, err
}
// bind channel
b, ok := c.bindingMgr.findByAddr(addr)
if !ok {
b = c.bindingMgr.create(addr)
}
bindSt := b.state()
if bindSt == bindingStateIdle || bindSt == bindingStateRequest || bindSt == bindingStateFailed {
func() {
// block only callers with the same binding until
// the binding transaction has been complete
b.muBind.Lock()
defer b.muBind.Unlock()
// binding state may have been changed while waiting. check again.
if b.state() == bindingStateIdle {
b.setState(bindingStateRequest)
go func() {
err2 := c.bind(b)
if err2 != nil {
c.log.Warnf("bind() failed: %s", err2.Error())
b.setState(bindingStateFailed)
// keep going...
} else {
b.setState(bindingStateReady)
}
}()
}
}()
// send data using SendIndication
peerAddr := addr2PeerAddress(addr)
var msg *stun.Message
msg, err = stun.Build(
stun.TransactionID,
stun.NewType(stun.MethodSend, stun.ClassIndication),
proto.Data(p),
peerAddr,
stun.Fingerprint,
)
if err != nil {
return 0, err
}
// indication has no transaction (fire-and-forget)
return c.obs.WriteTo(msg.Raw, c.obs.TURNServerAddr())
}
// binding is either ready
// check if the binding needs a refresh
func() {
b.muBind.Lock()
defer b.muBind.Unlock()
if b.state() == bindingStateReady && time.Since(b.refreshedAt()) > 5*time.Minute {
b.setState(bindingStateRefresh)
go func() {
err = c.bind(b)
if err != nil {
c.log.Warnf("bind() for refresh failed: %s", err.Error())
b.setState(bindingStateFailed)
// keep going...
} else {
b.setRefreshedAt(time.Now())
b.setState(bindingStateReady)
}
}()
}
}()
// send via ChannelData
return c.sendChannelData(p, b.number)
}
// Close closes the connection.
// Any blocked ReadFrom or WriteTo operations will be unblocked and return errors.
func (c *UDPConn) Close() error {
c.refreshAllocTimer.Stop()
c.refreshPermsTimer.Stop()
select {
case <-c.closeCh:
return errAlreadyClosed
default:
close(c.closeCh)
}
c.obs.OnDeallocated(c.relayedAddr)
return c.refreshAllocation(0, true /* dontWait=true */)
}
// LocalAddr returns the local network address.
func (c *UDPConn) LocalAddr() net.Addr {
return c.relayedAddr
}
// SetDeadline sets the read and write deadlines associated
// with the connection. It is equivalent to calling both
// SetReadDeadline and SetWriteDeadline.
//
// A deadline is an absolute time after which I/O operations
// fail with a timeout (see type Error) instead of
// blocking. The deadline applies to all future and pending
// I/O, not just the immediately following call to ReadFrom or
// WriteTo. After a deadline has been exceeded, the connection
// can be refreshed by setting a deadline in the future.
//
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful ReadFrom or WriteTo calls.
//
// A zero value for t means I/O operations will not time out.
func (c *UDPConn) SetDeadline(t time.Time) error {
return c.SetReadDeadline(t)
}
// SetReadDeadline sets the deadline for future ReadFrom calls
// and any currently-blocked ReadFrom call.
// A zero value for t means ReadFrom will not time out.
func (c *UDPConn) SetReadDeadline(t time.Time) error {
var d time.Duration
if t == noDeadline() {
d = time.Duration(math.MaxInt64)
} else {
d = time.Until(t)
}
c.readTimer.Reset(d)
return nil
}
// SetWriteDeadline sets the deadline for future WriteTo calls
// and any currently-blocked WriteTo call.
// Even if write times out, it may return n > 0, indicating that
// some of the data was successfully written.
// A zero value for t means WriteTo will not time out.
func (c *UDPConn) SetWriteDeadline(t time.Time) error {
// Write never blocks.
return nil
}
func addr2PeerAddress(addr net.Addr) proto.PeerAddress {
var peerAddr proto.PeerAddress
switch a := addr.(type) {
case *net.UDPAddr:
peerAddr.IP = a.IP
peerAddr.Port = a.Port
case *net.TCPAddr:
peerAddr.IP = a.IP
peerAddr.Port = a.Port
}
return peerAddr
}
func (c *UDPConn) createPermissions(addrs ...net.Addr) error {
setters := []stun.Setter{
stun.TransactionID,
stun.NewType(stun.MethodCreatePermission, stun.ClassRequest),
}
for _, addr := range addrs {
setters = append(setters, addr2PeerAddress(addr))
}
setters = append(setters,
c.obs.Username(),
c.obs.Realm(),
c.nonce(),
c.integrity,
stun.Fingerprint)
msg, err := stun.Build(setters...)
if err != nil {
return err
}
trRes, err := c.obs.PerformTransaction(msg, c.obs.TURNServerAddr(), false)
if err != nil {
return err
}
res := trRes.Msg
if res.Type.Class == stun.ClassErrorResponse {
var code stun.ErrorCodeAttribute
if err = code.GetFrom(res); err == nil {
if code.Code == stun.CodeStaleNonce {
c.setNonceFromMsg(res)
return errTryAgain
}
return fmt.Errorf("%s (error %s)", res.Type, code) //nolint:goerr113
}
return fmt.Errorf("%s", res.Type) //nolint:goerr113
}
return nil
}
// HandleInbound passes inbound data in UDPConn
func (c *UDPConn) HandleInbound(data []byte, from net.Addr) {
// copy data
copied := make([]byte, len(data))
copy(copied, data)
select {
case c.readCh <- &inboundData{data: copied, from: from}:
default:
c.log.Warnf("receive buffer full")
}
}
// FindAddrByChannelNumber returns a peer address associated with the
// channel number on this UDPConn
func (c *UDPConn) FindAddrByChannelNumber(chNum uint16) (net.Addr, bool) {
b, ok := c.bindingMgr.findByNumber(chNum)
if !ok {
return nil, false
}
return b.addr, true
}
func (c *UDPConn) setNonceFromMsg(msg *stun.Message) {
// Update nonce
var nonce stun.Nonce
if err := nonce.GetFrom(msg); err == nil {
c.setNonce(nonce)
c.log.Debug("refresh allocation: 438, got new nonce.")
} else {
c.log.Warn("refresh allocation: 438 but no nonce.")
}
}
func (c *UDPConn) refreshAllocation(lifetime time.Duration, dontWait bool) error {
msg, err := stun.Build(
stun.TransactionID,
stun.NewType(stun.MethodRefresh, stun.ClassRequest),
proto.Lifetime{Duration: lifetime},
c.obs.Username(),
c.obs.Realm(),
c.nonce(),
c.integrity,
stun.Fingerprint,
)
if err != nil {
return fmt.Errorf("%w: %s", errFailedToBuildRefreshRequest, err.Error())
}
c.log.Debugf("send refresh request (dontWait=%v)", dontWait)
trRes, err := c.obs.PerformTransaction(msg, c.obs.TURNServerAddr(), dontWait)
if err != nil {
return fmt.Errorf("%w: %s", errFailedToRefreshAllocation, err.Error())
}
if dontWait {
c.log.Debug("refresh request sent")
return nil
}
c.log.Debug("refresh request sent, and waiting response")
res := trRes.Msg
if res.Type.Class == stun.ClassErrorResponse {
var code stun.ErrorCodeAttribute
if err = code.GetFrom(res); err == nil {
if code.Code == stun.CodeStaleNonce {
c.setNonceFromMsg(res)
return errTryAgain
}
return err
}
return fmt.Errorf("%s", res.Type) //nolint:goerr113
}
// Getting lifetime from response
var updatedLifetime proto.Lifetime
if err := updatedLifetime.GetFrom(res); err != nil {
return fmt.Errorf("%w: %s", errFailedToGetLifetime, err.Error())
}
c.setLifetime(updatedLifetime.Duration)
c.log.Debugf("updated lifetime: %d seconds", int(c.lifetime().Seconds()))
return nil
}
func (c *UDPConn) refreshPermissions() error {
addrs := c.permMap.addrs()
if len(addrs) == 0 {
c.log.Debug("no permission to refresh")
return nil
}
if err := c.createPermissions(addrs...); err != nil {
if errors.Is(err, errTryAgain) {
return errTryAgain
}
c.log.Errorf("fail to refresh permissions: %s", err.Error())
return err
}
c.log.Debug("refresh permissions successful")
return nil
}
func (c *UDPConn) bind(b *binding) error {
setters := []stun.Setter{
stun.TransactionID,
stun.NewType(stun.MethodChannelBind, stun.ClassRequest),
addr2PeerAddress(b.addr),
proto.ChannelNumber(b.number),
c.obs.Username(),
c.obs.Realm(),
c.nonce(),
c.integrity,
stun.Fingerprint,
}
msg, err := stun.Build(setters...)
if err != nil {
return err
}
trRes, err := c.obs.PerformTransaction(msg, c.obs.TURNServerAddr(), false)
if err != nil {
c.bindingMgr.deleteByAddr(b.addr)
return err
}
res := trRes.Msg
if res.Type != stun.NewType(stun.MethodChannelBind, stun.ClassSuccessResponse) {
return fmt.Errorf("unexpected response type %s", res.Type) //nolint:goerr113
}
c.log.Debugf("channel binding successful: %s %d", b.addr.String(), b.number)
// Success.
return nil
}
func (c *UDPConn) sendChannelData(data []byte, chNum uint16) (int, error) {
chData := &proto.ChannelData{
Data: data,
Number: proto.ChannelNumber(chNum),
}
chData.Encode()
return c.obs.WriteTo(chData.Raw, c.obs.TURNServerAddr())
}
func (c *UDPConn) onRefreshTimers(id int) {
c.log.Debugf("refresh timer %d expired", id)
switch id {
case timerIDRefreshAlloc:
var err error
lifetime := c.lifetime()
// limit the max retries on errTryAgain to 3
// when stale nonce returns, sencond retry should succeed
for i := 0; i < maxRetryAttempts; i++ {
err = c.refreshAllocation(lifetime, false)
if !errors.Is(err, errTryAgain) {
break
}
}
if err != nil {
c.log.Warnf("refresh allocation failed")
}
case timerIDRefreshPerms:
var err error
for i := 0; i < maxRetryAttempts; i++ {
err = c.refreshPermissions()
if !errors.Is(err, errTryAgain) {
break
}
}
if err != nil {
c.log.Warnf("refresh permissions failed")
}
}
}
func (c *UDPConn) nonce() stun.Nonce {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c._nonce
}
func (c *UDPConn) setNonce(nonce stun.Nonce) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.log.Debugf("set new nonce with %d bytes", len(nonce))
c._nonce = nonce
}
func (c *UDPConn) lifetime() time.Duration {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c._lifetime
}
func (c *UDPConn) setLifetime(lifetime time.Duration) {
c.mutex.Lock()
defer c.mutex.Unlock()
c._lifetime = lifetime
}