2023-07-05 13:59:39 +00:00
package async
2023-06-22 11:28:35 +00:00
import (
"context"
"errors"
"fmt"
"sync"
orderedmap "github.com/wk8/go-ordered-map/v2"
)
var ErrTaskOverwritten = errors . New ( "task overwritten" )
2024-08-16 08:11:18 +00:00
// Scheduler ensures that only one task of a type is running at a time.
2023-06-22 11:28:35 +00:00
type Scheduler struct {
queue * orderedmap . OrderedMap [ TaskType , * taskContext ]
queueMutex sync . Mutex
context context . Context
cancelFn context . CancelFunc
doNotDeleteCurrentTask bool
}
type ReplacementPolicy = int
const (
2024-01-08 21:24:30 +00:00
// ReplacementPolicyCancelOld for when the task arguments might change the result
2023-06-22 11:28:35 +00:00
ReplacementPolicyCancelOld ReplacementPolicy = iota
2024-01-08 21:24:30 +00:00
// ReplacementPolicyIgnoreNew for when the task arguments doesn't change the result
2023-06-22 11:28:35 +00:00
ReplacementPolicyIgnoreNew
)
type TaskType struct {
2023-07-24 22:54:53 +00:00
ID int64
2023-06-22 11:28:35 +00:00
Policy ReplacementPolicy
}
type taskFunction func ( context . Context ) ( interface { } , error )
type resultFunction func ( interface { } , TaskType , error )
type taskContext struct {
taskType TaskType
policy ReplacementPolicy
taskFn taskFunction
resFn resultFunction
}
func NewScheduler ( ) * Scheduler {
return & Scheduler {
queue : orderedmap . New [ TaskType , * taskContext ] ( ) ,
}
}
// Enqueue provides a queue of task types allowing only one task at a time of the corresponding type. The running task is the first one in the queue (s.queue.Oldest())
//
// Schedule policy for new tasks
// - pushed at the back of the queue (s.queue.PushBack()) if none of the same time already scheduled
// - overwrite the queued one of the same type, depending on the policy
// - In case of ReplacementPolicyIgnoreNew, the new task will be ignored
// - In case of ReplacementPolicyCancelOld, the old running task will be canceled or if not yet run overwritten and the new one will be executed when its turn comes.
//
// The task function (taskFn) might not be executed if
// - the task is ignored
// - the task is overwritten. The result function (resFn) will be called with ErrTaskOverwritten
//
// The result function (resFn) will always be called if the task is not ignored
func ( s * Scheduler ) Enqueue ( taskType TaskType , taskFn taskFunction , resFn resultFunction ) ( ignored bool ) {
s . queueMutex . Lock ( )
defer s . queueMutex . Unlock ( )
taskRunning := s . queue . Len ( ) > 0
existingTask , typeInQueue := s . queue . Get ( taskType )
2024-08-16 08:11:18 +00:00
// we need wrap the original resFn to ensure it is called only once
// otherwise, there's a chance that it will be called twice if we
// call Stop() quickly after Enqueue while task is running
var invokeResFnOnce sync . Once
onceResFn := func ( res interface { } , taskType TaskType , err error ) {
invokeResFnOnce . Do ( func ( ) {
resFn ( res , taskType , err )
} )
}
2023-06-22 11:28:35 +00:00
newTask := & taskContext {
taskType : taskType ,
policy : taskType . Policy ,
taskFn : taskFn ,
2024-08-16 08:11:18 +00:00
resFn : onceResFn ,
2023-06-22 11:28:35 +00:00
}
if taskRunning {
if typeInQueue {
if s . queue . Oldest ( ) . Value . taskType == taskType {
// If same task type is running
if existingTask . policy == ReplacementPolicyCancelOld {
// If a previous task is running, cancel it
if s . cancelFn != nil {
s . cancelFn ( )
s . cancelFn = nil
} else {
// In case of multiple tasks of the same type, the previous one is overwritten
go func ( ) {
existingTask . resFn ( nil , existingTask . taskType , ErrTaskOverwritten )
} ( )
}
s . doNotDeleteCurrentTask = true
// Add it again to refresh the order of the task
s . queue . Delete ( taskType )
s . queue . Set ( taskType , newTask )
} else {
ignored = true
}
} else {
// if other task type is running
// notify the queued one that it is overwritten or ignored
if existingTask . policy == ReplacementPolicyCancelOld {
2024-08-16 12:18:09 +00:00
oldResFn := existingTask . resFn
2023-06-22 11:28:35 +00:00
go func ( ) {
2024-08-16 12:18:09 +00:00
oldResFn ( nil , existingTask . taskType , ErrTaskOverwritten )
2023-06-22 11:28:35 +00:00
} ( )
// Overwrite the queued one of the same type
existingTask . taskFn = taskFn
2024-08-16 08:11:18 +00:00
existingTask . resFn = onceResFn
2023-06-22 11:28:35 +00:00
} else {
ignored = true
}
}
} else {
// Policy does not matter for the fist enqueued task of a type
s . queue . Set ( taskType , newTask )
}
} else {
// If no task is running add and run it. The worker will take care of scheduling new tasks added while running
s . queue . Set ( taskType , newTask )
existingTask = newTask
s . runTask ( existingTask , taskFn , func ( res interface { } , runningTask * taskContext , err error ) {
2024-08-16 08:11:18 +00:00
s . finishedTask ( res , runningTask , onceResFn , err )
2023-06-22 11:28:35 +00:00
} )
}
return ignored
}
func ( s * Scheduler ) runTask ( tc * taskContext , taskFn taskFunction , resFn func ( interface { } , * taskContext , error ) ) {
thisContext , thisCancelFn := context . WithCancel ( context . Background ( ) )
s . cancelFn = thisCancelFn
s . context = thisContext
go func ( ) {
res , err := taskFn ( thisContext )
// Release context resources
thisCancelFn ( )
if errors . Is ( err , context . Canceled ) {
resFn ( res , tc , fmt . Errorf ( "task canceled: %w" , err ) )
} else {
resFn ( res , tc , err )
}
} ( )
}
// finishedTask is the only one that can remove a task from the queue
2023-08-11 17:28:46 +00:00
// if the current running task completed (doNotDeleteCurrentTask is true)
func ( s * Scheduler ) finishedTask ( finishedRes interface { } , doneTask * taskContext , finishedResFn resultFunction , finishedErr error ) {
2023-06-22 11:28:35 +00:00
s . queueMutex . Lock ( )
current := s . queue . Oldest ( )
// Delete current task if not overwritten
if s . doNotDeleteCurrentTask {
s . doNotDeleteCurrentTask = false
} else {
2024-08-16 08:11:18 +00:00
// current maybe nil if Stop() is called
if current != nil {
s . queue . Delete ( current . Value . taskType )
}
2023-06-22 11:28:35 +00:00
}
// Run next task
if pair := s . queue . Oldest ( ) ; pair != nil {
nextTask := pair . Value
s . runTask ( nextTask , nextTask . taskFn , func ( res interface { } , runningTask * taskContext , err error ) {
s . finishedTask ( res , runningTask , runningTask . resFn , err )
} )
} else {
s . cancelFn = nil
}
s . queueMutex . Unlock ( )
// Report result
2023-08-11 17:28:46 +00:00
finishedResFn ( finishedRes , doneTask . taskType , finishedErr )
2023-06-22 11:28:35 +00:00
}
func ( s * Scheduler ) Stop ( ) {
s . queueMutex . Lock ( )
defer s . queueMutex . Unlock ( )
if s . cancelFn != nil {
s . cancelFn ( )
s . cancelFn = nil
}
// Empty the queue so the running task will not be restarted
for pair := s . queue . Oldest ( ) ; pair != nil ; pair = pair . Next ( ) {
// Notify the queued one that they are canceled
if pair . Value . policy == ReplacementPolicyCancelOld {
2024-08-15 12:46:36 +00:00
go func ( val * taskContext ) {
val . resFn ( nil , val . taskType , context . Canceled )
} ( pair . Value )
2023-06-22 11:28:35 +00:00
}
s . queue . Delete ( pair . Value . taskType )
}
}