// SPDX-FileCopyrightText: 2023 The Pion community // SPDX-License-Identifier: MIT package sctp import ( "math" "sync" "time" ) const ( // RTO.Initial in msec rtoInitial float64 = 1.0 * 1000 // RTO.Min in msec rtoMin float64 = 1.0 * 1000 // RTO.Max in msec defaultRTOMax float64 = 60.0 * 1000 // RTO.Alpha rtoAlpha float64 = 0.125 // RTO.Beta rtoBeta float64 = 0.25 // Max.Init.Retransmits: maxInitRetrans uint = 8 // Path.Max.Retrans pathMaxRetrans uint = 5 noMaxRetrans uint = 0 ) // rtoManager manages Rtx timeout values. // This is an implementation of RFC 4960 sec 6.3.1. type rtoManager struct { srtt float64 rttvar float64 rto float64 noUpdate bool mutex sync.RWMutex rtoMax float64 } // newRTOManager creates a new rtoManager. func newRTOManager(rtoMax float64) *rtoManager { mgr := rtoManager{ rto: rtoInitial, rtoMax: rtoMax, } if mgr.rtoMax == 0 { mgr.rtoMax = defaultRTOMax } return &mgr } // setNewRTT takes a newly measured RTT then adjust the RTO in msec. func (m *rtoManager) setNewRTT(rtt float64) float64 { m.mutex.Lock() defer m.mutex.Unlock() if m.noUpdate { return m.srtt } if m.srtt == 0 { // First measurement m.srtt = rtt m.rttvar = rtt / 2 } else { // Subsequent rtt measurement m.rttvar = (1-rtoBeta)*m.rttvar + rtoBeta*(math.Abs(m.srtt-rtt)) m.srtt = (1-rtoAlpha)*m.srtt + rtoAlpha*rtt } m.rto = math.Min(math.Max(m.srtt+4*m.rttvar, rtoMin), m.rtoMax) return m.srtt } // getRTO simply returns the current RTO in msec. func (m *rtoManager) getRTO() float64 { m.mutex.RLock() defer m.mutex.RUnlock() return m.rto } // reset resets the RTO variables to the initial values. func (m *rtoManager) reset() { m.mutex.Lock() defer m.mutex.Unlock() if m.noUpdate { return } m.srtt = 0 m.rttvar = 0 m.rto = rtoInitial } // set RTO value for testing func (m *rtoManager) setRTO(rto float64, noUpdate bool) { m.mutex.Lock() defer m.mutex.Unlock() m.rto = rto m.noUpdate = noUpdate } // rtxTimerObserver is the inteface to a timer observer. // NOTE: Observers MUST NOT call start() or stop() method on rtxTimer // from within these callbacks. type rtxTimerObserver interface { onRetransmissionTimeout(timerID int, n uint) onRetransmissionFailure(timerID int) } type rtxTimerState uint8 const ( rtxTimerStopped rtxTimerState = iota rtxTimerStarted rtxTimerClosed ) // rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1 type rtxTimer struct { timer *time.Timer observer rtxTimerObserver id int maxRetrans uint rtoMax float64 mutex sync.Mutex rto float64 nRtos uint state rtxTimerState pending uint8 } // newRTXTimer creates a new retransmission timer. // if maxRetrans is set to 0, it will keep retransmitting until stop() is called. // (it will never make onRetransmissionFailure() callback. func newRTXTimer(id int, observer rtxTimerObserver, maxRetrans uint, rtoMax float64, ) *rtxTimer { timer := rtxTimer{ id: id, observer: observer, maxRetrans: maxRetrans, rtoMax: rtoMax, } if timer.rtoMax == 0 { timer.rtoMax = defaultRTOMax } timer.timer = time.AfterFunc(math.MaxInt64, timer.timeout) timer.timer.Stop() return &timer } func (t *rtxTimer) calculateNextTimeout() time.Duration { timeout := calculateNextTimeout(t.rto, t.nRtos, t.rtoMax) return time.Duration(timeout) * time.Millisecond } func (t *rtxTimer) timeout() { t.mutex.Lock() if t.pending--; t.pending == 0 && t.state == rtxTimerStarted { if t.nRtos++; t.maxRetrans == 0 || t.nRtos <= t.maxRetrans { t.timer.Reset(t.calculateNextTimeout()) t.pending++ defer t.observer.onRetransmissionTimeout(t.id, t.nRtos) } else { t.state = rtxTimerStopped defer t.observer.onRetransmissionFailure(t.id) } } t.mutex.Unlock() } // start starts the timer. func (t *rtxTimer) start(rto float64) bool { t.mutex.Lock() defer t.mutex.Unlock() // this timer is already closed or aleady running if t.state != rtxTimerStopped { return false } // Note: rto value is intentionally not capped by RTO.Min to allow // fast timeout for the tests. Non-test code should pass in the // rto generated by rtoManager getRTO() method which caps the // value at RTO.Min or at RTO.Max. t.rto = rto t.nRtos = 0 t.state = rtxTimerStarted t.pending++ t.timer.Reset(t.calculateNextTimeout()) return true } // stop stops the timer. func (t *rtxTimer) stop() { t.mutex.Lock() defer t.mutex.Unlock() if t.state == rtxTimerStarted { if t.timer.Stop() { t.pending-- } t.state = rtxTimerStopped } } // closes the timer. this is similar to stop() but subsequent start() call // will fail (the timer is no longer usable) func (t *rtxTimer) close() { t.mutex.Lock() defer t.mutex.Unlock() if t.state == rtxTimerStarted && t.timer.Stop() { t.pending-- } t.state = rtxTimerClosed } // isRunning tests if the timer is running. // Debug purpose only func (t *rtxTimer) isRunning() bool { t.mutex.Lock() defer t.mutex.Unlock() return t.state == rtxTimerStarted } func calculateNextTimeout(rto float64, nRtos uint, rtoMax float64) float64 { // RFC 4096 sec 6.3.3. Handle T3-rtx Expiration // E2) For the destination address for which the timer expires, set RTO // <- RTO * 2 ("back off the timer"). The maximum value discussed // in rule C7 above (RTO.max) may be used to provide an upper bound // to this doubling operation. if nRtos < 31 { m := 1 << nRtos return math.Min(rto*float64(m), rtoMax) } return rtoMax }