Prem Chaitanya Prathi 97db14083a
chore_: bump go-waku with filter loop fix (#5909)
* chore_: bump go-waku with filter loop fix

* fix_: correct fleet node for staging fleet

* fix_: use shards for lightclient init

---------

Co-authored-by: Richard Ramos <info@richardramos.me>
2024-10-10 17:03:36 +05:30

155 lines
3.2 KiB
Go

// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
package webrtc
import (
"container/list"
"sync"
)
// Operation is a function
type operation func()
// Operations is a task executor.
type operations struct {
mu sync.Mutex
busyCh chan struct{}
ops *list.List
updateNegotiationNeededFlagOnEmptyChain *atomicBool
onNegotiationNeeded func()
isClosed bool
}
func newOperations(
updateNegotiationNeededFlagOnEmptyChain *atomicBool,
onNegotiationNeeded func(),
) *operations {
return &operations{
ops: list.New(),
updateNegotiationNeededFlagOnEmptyChain: updateNegotiationNeededFlagOnEmptyChain,
onNegotiationNeeded: onNegotiationNeeded,
}
}
// Enqueue adds a new action to be executed. If there are no actions scheduled,
// the execution will start immediately in a new goroutine. If the queue has been
// closed, the operation will be dropped. The queue is only deliberately closed
// by a user.
func (o *operations) Enqueue(op operation) {
o.mu.Lock()
defer o.mu.Unlock()
_ = o.tryEnqueue(op)
}
// tryEnqueue attempts to enqueue the given operation. It returns false
// if the op is invalid or the queue is closed. mu must be locked by
// tryEnqueue's caller.
func (o *operations) tryEnqueue(op operation) bool {
if op == nil {
return false
}
if o.isClosed {
return false
}
o.ops.PushBack(op)
if o.busyCh == nil {
o.busyCh = make(chan struct{})
go o.start()
}
return true
}
// IsEmpty checks if there are tasks in the queue
func (o *operations) IsEmpty() bool {
o.mu.Lock()
defer o.mu.Unlock()
return o.ops.Len() == 0
}
// Done blocks until all currently enqueued operations are finished executing.
// For more complex synchronization, use Enqueue directly.
func (o *operations) Done() {
var wg sync.WaitGroup
wg.Add(1)
o.mu.Lock()
enqueued := o.tryEnqueue(func() {
wg.Done()
})
o.mu.Unlock()
if !enqueued {
return
}
wg.Wait()
}
// GracefulClose waits for the operations queue to be cleared and forbids
// new operations from being enqueued.
func (o *operations) GracefulClose() {
o.mu.Lock()
if o.isClosed {
o.mu.Unlock()
return
}
// do not enqueue anymore ops from here on
// o.isClosed=true will also not allow a new busyCh
// to be created.
o.isClosed = true
busyCh := o.busyCh
o.mu.Unlock()
if busyCh == nil {
return
}
<-busyCh
}
func (o *operations) pop() func() {
o.mu.Lock()
defer o.mu.Unlock()
if o.ops.Len() == 0 {
return nil
}
e := o.ops.Front()
o.ops.Remove(e)
if op, ok := e.Value.(operation); ok {
return op
}
return nil
}
func (o *operations) start() {
defer func() {
o.mu.Lock()
defer o.mu.Unlock()
// this wil lbe the most recent busy chan
close(o.busyCh)
if o.ops.Len() == 0 || o.isClosed {
o.busyCh = nil
return
}
// either a new operation was enqueued while we
// were busy, or an operation panicked
o.busyCh = make(chan struct{})
go o.start()
}()
fn := o.pop()
for fn != nil {
fn()
fn = o.pop()
}
if !o.updateNegotiationNeededFlagOnEmptyChain.get() {
return
}
o.updateNegotiationNeededFlagOnEmptyChain.set(false)
o.onNegotiationNeeded()
}