fix_: scheduler panic after quick stop
This commit is contained in:
parent
da70d6f1b5
commit
d3b1999963
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
var ErrTaskOverwritten = errors.New("task overwritten")
|
||||
|
||||
// Scheduler ensures that only one task of a type is running at a time.
|
||||
type Scheduler struct {
|
||||
queue *orderedmap.OrderedMap[TaskType, *taskContext]
|
||||
queueMutex sync.Mutex
|
||||
|
@ -71,11 +72,20 @@ func (s *Scheduler) Enqueue(taskType TaskType, taskFn taskFunction, resFn result
|
|||
taskRunning := s.queue.Len() > 0
|
||||
existingTask, typeInQueue := s.queue.Get(taskType)
|
||||
|
||||
// we need wrap the original resFn to ensure it is called only once
|
||||
// otherwise, there's a chance that it will be called twice if we
|
||||
// call Stop() quickly after Enqueue while task is running
|
||||
var invokeResFnOnce sync.Once
|
||||
onceResFn := func(res interface{}, taskType TaskType, err error) {
|
||||
invokeResFnOnce.Do(func() {
|
||||
resFn(res, taskType, err)
|
||||
})
|
||||
}
|
||||
newTask := &taskContext{
|
||||
taskType: taskType,
|
||||
policy: taskType.Policy,
|
||||
taskFn: taskFn,
|
||||
resFn: resFn,
|
||||
resFn: onceResFn,
|
||||
}
|
||||
|
||||
if taskRunning {
|
||||
|
@ -111,7 +121,7 @@ func (s *Scheduler) Enqueue(taskType TaskType, taskFn taskFunction, resFn result
|
|||
}()
|
||||
// Overwrite the queued one of the same type
|
||||
existingTask.taskFn = taskFn
|
||||
existingTask.resFn = resFn
|
||||
existingTask.resFn = onceResFn
|
||||
} else {
|
||||
ignored = true
|
||||
}
|
||||
|
@ -125,7 +135,7 @@ func (s *Scheduler) Enqueue(taskType TaskType, taskFn taskFunction, resFn result
|
|||
s.queue.Set(taskType, newTask)
|
||||
existingTask = newTask
|
||||
s.runTask(existingTask, taskFn, func(res interface{}, runningTask *taskContext, err error) {
|
||||
s.finishedTask(res, runningTask, resFn, err)
|
||||
s.finishedTask(res, runningTask, onceResFn, err)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -156,13 +166,15 @@ func (s *Scheduler) runTask(tc *taskContext, taskFn taskFunction, resFn func(int
|
|||
func (s *Scheduler) finishedTask(finishedRes interface{}, doneTask *taskContext, finishedResFn resultFunction, finishedErr error) {
|
||||
s.queueMutex.Lock()
|
||||
|
||||
// We always have a running task
|
||||
current := s.queue.Oldest()
|
||||
// Delete current task if not overwritten
|
||||
if s.doNotDeleteCurrentTask {
|
||||
s.doNotDeleteCurrentTask = false
|
||||
} else {
|
||||
s.queue.Delete(current.Value.taskType)
|
||||
// current maybe nil if Stop() is called
|
||||
if current != nil {
|
||||
s.queue.Delete(current.Value.taskType)
|
||||
}
|
||||
}
|
||||
|
||||
// Run next task
|
||||
|
|
|
@ -392,3 +392,35 @@ func TestScheduler_Enqueue_InResult(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestScheduler_Enqueue_Quick_Stop(t *testing.T) {
|
||||
scheduler := NewScheduler()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
longRunningTask := func(ctx context.Context) (interface{}, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// we should reach here rather than other condition branch as Stop() canceled the context quickly
|
||||
wg.Done()
|
||||
return nil, ctx.Err()
|
||||
case <-time.After(10 * time.Second):
|
||||
wg.Done()
|
||||
return "task completed", nil
|
||||
}
|
||||
}
|
||||
|
||||
resFn := func(res interface{}, taskType TaskType, err error) {
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
scheduler.Enqueue(TaskType{ID: 1, Policy: ReplacementPolicyCancelOld}, longRunningTask, resFn)
|
||||
|
||||
require.NotPanics(t, func() {
|
||||
scheduler.Stop()
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue