const Dependency = require('dependency-solver')
const debug = require('debug')('Tasker.Runner')
const verbose = require('debug')('Tasker.Runner.verbose')
const {JSONPath} = require('jsonpath-plus')
const EventEmitter = require('eventemitter3')
/**
* Class representing a task runner.
*/
class Runner extends EventEmitter {
/**
* Create a task runner
* @param {Number} [options.parallel=10] Number of parallel foreground tasks can be run in parallel
* @param {Number} [options.restartDelayMs=5000] Number of milliseconds to wait before restarting a task during task resets
* @param {Number} [options.planningIntervalMs=100] Number of milliseconds between planning watchdog timer
*/
constructor({parallel=10, restartDelayMs=5000, planningIntervalMs=100}={}){
super()
this.holding = {}
this.pending = {}
this.running = {}
this.success = {}
this.failure = {}
this.background = {}
this.parallel = parallel || 10
this.taskOrder = []
this.started = false
this._noWorkCount = 0
this._runWatchdog = undefined
this._planningInterval = planningIntervalMs || 100
this._restartDelay = restartDelayMs || 5000
}
/**
* Start task runner
* @fires Runner#running
*/
async start(){
this.started = true
if(this._runWatchdog){ return }
this.runTasks()
//this._runWatchdog = setTimeout(this.runTasks.bind(this), this._planningInterval)
/**
* Running event.
*
* @event Runner#running
*/
this.emit('running')
}
/**
* Stop running tasks. Calls Task.cancel on all running and pending tasks. Then resets all tasks.
*/
async stop(){
debug('stopping')
this.printTaskLists()
this.started = false
clearTimeout(this._runWatchdog)
this._runWatchdog = undefined
const taskList = this.tasks
if(taskList && this.hasWork()){
debug('cancelling incomplete tasks')
const nameList = Object.keys(taskList)
for(let name of nameList){
if(!this.isDone(name)){
let task = taskList[name]
task.off('pre-success', this.onPreSuccess.bind(this))
task.off('pre-failure', this.onPreFailure.bind(this))
await task.cancel()
}
}
}
if(taskList){
Object.keys(taskList).map(name=>{
this.resetTask(name)
})
}
this.holding = {}
this.pending = {}
this.running = {}
this.success = {}
this.failure = {}
this.background = {}
this.taskOrder = []
this.started = false
this._noWorkCount = 0
}
/**
* A collection mapping task names to Task instances.
*/
get tasks(){
return Object.assign(
{},
this.holding,
this.pending,
this.running,
this.background,
this.success,
this.failure
)
}
/** An array of tasks in the order initially added. */
get depends(){
//let graph = {}
const taskMap = this.tasks
let taskList = []
for(let taskName in taskMap){
const task = taskMap[taskName]
taskList.push( task )
//verbose(taskName)
//graph[taskName+''] = task.depends
}
//verbose(JSON.stringify(taskList))
return taskList
}
/** List of task names in dependency resolved order */
get runOrder(){
const depends = this.depends
let taskOrder = []
const tasksWithDepends = JSONPath(
'$.graph[?(@.depends.length >0)].name',
{graph: depends}
)
const tasksWithoutDepends = JSONPath(
'$.graph[?(@.depends.length ==0)].name',
{graph: depends}
)
taskOrder = taskOrder.concat(tasksWithoutDepends)
//verbose('depends', JSON.stringify(tasksWithDepends))
//verbose('no depends', JSON.stringify(tasksWithoutDepends))
//verbose('taskOrder-prelim', taskOrder)
if(tasksWithDepends && tasksWithDepends.length > 0){
const graph = {}
tasksWithDepends.map((taskName)=>{
const task = this.tasks[taskName]
graph[taskName]=task.depends
})
const solved = Dependency.solve(graph)
//verbose('solved', solved)
for(let taskName of solved){
if(taskOrder.indexOf(taskName) < 0 ){
taskOrder.push( taskName )
}
}
}
verbose('taskOrder', taskOrder)
this.taskOrder = taskOrder
return taskOrder
}
/**
* Get collection of task results
* @param {string[]} nameList List of task results to lookup
* @returns {any}
*/
collectResults(nameList){
const taskList = this.tasks
let results = {}
nameList.map((taskName)=>{ results[taskName] = taskList[taskName] })
return results
}
/**
* Execute tasks
* @fires Runner#idle
*/
runTasks(){
verbose('runTasks')
if(!this.hasWork()){
verbose('no work')
this._noWorkCount++
if(this._noWorkCount >= 2){
clearTimeout(this._runWatchdog)
this.runningCount = undefined
/**
* Idle event.
*
* @event Runner#idle
*/
this.emit('idle')
return
}
} else {
this._noWorkCount = 0
}
const order = this.runOrder
const taskList = this.tasks
let runningCount = Object.keys(this.running).length
verbose('Running ',runningCount,'out of',this.parallel)
if(runningCount >= this.parallel){
if(this.started){
this._runWatchdog = setTimeout(this.runTasks.bind(this), this._planningInterval)
}
return
}
for(let taskName of order){
verbose('review task', taskName)
let task = taskList[taskName]
runningCount = Object.keys(this.running).length
try{
if(this.canRun(task)){
verbose('\t\tcanRun - true')
switch(this.taskState(taskName)){
case 'holding':
this.setTaskState(taskName, 'pending')
break;
case 'pending':
if(runningCount >= this.parallel) { continue }
if(! this.allDone(task.depends)) { continue }
this.setTaskState(taskName, (!task.background ? 'running': 'background'))
task.once('pre-success', this.onPreSuccess.bind(this))
task.once('pre-failure', this.onPreFailure.bind(this))
task.run(this.collectResults(task.depends)).catch(err=>{
debug('error while running task -', taskName)
this.printTaskLists()
return Promise.resolve()
})
break;
default:
break
}
}
else{
verbose('\t\tcanRun == FALSE')
}
} catch (err) {
debug('failed to run task -', taskName, 'error -', err)
}
}
runningCount = Object.keys(this.running).length
verbose('Running ',runningCount,'out of',this.parallel)
//this.printTaskLists()
if(this.started){
this._runWatchdog = setTimeout(this.runTasks.bind(this), this._planningInterval)
}
}
/**
* Print task list for debugging. Must enable debugging
* @private
*/
printTaskLists(){
let queues = ['holding','pending','running', 'background', 'success','failure']
for(let queueName of queues){
let queue = this[queueName]
if(!queue){
debug('queue - ', queueName, null)
continue
}
debug('queue - ', queueName, 'length', Object.keys(queue).length)
for(let taskName in queue){
let task = queue[taskName]
debug('\t\ttask - ', taskName, task.failure)
}
}
debug(this.taskOrder)
}
/**
* @fires Runner#task-done
* @fires Runner#task-success
* @private
*/
onPreSuccess(task){
verbose('Success - ', task.name)
task.off('pre-success', this.onPreSuccess.bind(this))
task.off('pre-failure', this.onPreFailure.bind(this))
this.setTaskState(task.name, 'success')
/**
* Task done event.
*
* @event Runner#task-done
* @type {Task}
*/
this.emit('task-done', task)
/**
* Task success event.
*
* @event Runner#task-success
* @type {Task}
*/
this.emit('task-success', task)
}
/**
* @fires Runner#task-done
* @fires Runner#task-failure
* @private
*/
onPreFailure(task){
verbose('Failure - ', task.name, task.failure)
task.off('pre-success', this.onPreSuccess.bind(this))
task.off('pre-failure', this.onPreFailure.bind(this))
this.setTaskState(task.name, 'failure')
this.emit('task-done', task)
/**
* Task failure event.
*
* @event Runner#task-failure
* @type {Task}
*/
this.emit('task-failure', task)
if(task.background && !task._cancel){
this.restartTask(task.name)
}
}
/**
* Call the reset function on the named task and reschedule task.
* @param {string} taskName
* @param {Number} timeout Timeout ms. Defaults to restartDelayMs provided in constructor
*/
restartTask(taskName, timeout){
debug('restarting task - ', taskName, 'in', timeout||this._restartDelay, 'ms')
setTimeout(async ()=>{
let task = this.getTask(taskName)
if(!task){return}
await task.reset()
this.setTaskState(taskName)
this.addTask(task)
if(this.started && this._runWatchdog == undefined){ this.start() }
}, timeout||this._restartDelay)
}
/**
* Call the reset function on the named task
* @param {string} taskName
* @param {Number} timeout Timeout ms. Defaults to restartDelayMs provided in constructor
*/
resetTask(taskName, timeout){
debug('resetting task - ', taskName, 'in', timeout||this._restartDelay, 'ms')
setTimeout(async ()=>{
let task = this.getTask(taskName)
if(!task){return}
await task.reset()
this.setTaskState(taskName)
}, timeout||this._restartDelay)
}
/**
* Check if the there are pending or running tasks
* @returns bool
*/
hasWork(){
const queueList = ['holding', 'pending', 'running', 'background']
for(let queueName of queueList){
const queue = this[queueName]
if(Object.keys(queue).length > 0 ){
verbose('hasWork == true - ', queueName)
return true
}
}
verbose('hasWork == false')
return false
}
/**
* Check if the task list is running
* @param {string} taskName
* @returns bool
*/
isRunning(taskName){
let state = this.taskState(taskName)
return 'running' === state || 'background' === state
}
/**
* Check if the task list is pending
* @param {string} taskName
* @returns bool
*/
isPending(taskName){
let state = this.taskState(name)
return 'pending' === state || 'holding' === state
}
/**
* Check if the task list is done
* @param {string} taskName
* @returns bool
*/
isDone(taskName){
let state = this.taskState(taskName)
return (['success', 'failure'].indexOf(state) > -1)
}
/**
* Check if the entire task list is complete
* @param {string[]} taskList
* @returns bool
*/
allDone(taskList){
for(let taskName of taskList){
if(!this.isDone(taskName)){
verbose('not done', taskName)
return false
}
}
return true
}
/**
* @typedef {'holding' |'pending' |'running' | 'background' | 'success' |'failure'} TaskState
*/
/**
* Lookup task state.
* @param {string} name Task name
* @returns {TaskState}
*/
taskState(name){
let queueNames = ['holding','pending','running', 'background', 'success','failure']
for(let queueName of queueNames){
let queue = this[queueName]
if(!queue){continue}
for(let taskName of Object.keys(queue)){
if(taskName == name){
return queueName
}
}
}
throw new Error('findTask - Task ['+name+'] not found')
}
/**
* Change task state
*
* @param {string} taskName
* @param {string} state New task state
* @private
*/
setTaskState(taskName, state){
if(['holding','pending','running','background','success','failure', undefined].indexOf(state) < 0){
throw new Error('setTaskState - Invalid state['+state+']')
}
let currentState = this.taskState(taskName)
if(currentState == state ){ return }
let currentQueue = this[ currentState ]
let task = currentQueue[taskName]
currentQueue[taskName] = undefined
delete currentQueue[taskName]
if(!state){ return }
this[state][taskName] = task
verbose('setTaskState - task ['+task.name+'] is '+state)
}
/**
* Can the task be run
* @param {Tasker.Task} task
* @returns bool
*/
canRun(task){
try{
let ready = this.allDone(task.depends)
}
catch(err){
verbose('false due to error ', task)
return false
}
return true
}
/**
* Does task exist in runner?
* @param {string} taskName
* @returns bool
*/
exists(taskName){
return this.tasks[taskName] !== undefined
}
/**
* Add task to run queue
* @param {Tasker.Task} task
* @returns {Tasker.Task}
*/
async addTask(task){
if(this.exists(task.name)){
throw new Error('duplicate task name ['+task.name+']')
}
if(this.canRun(task)){
debug('addTask - task ['+task.name+'] is pending')
this.pending[task.name] = task
} else {
debug('addTask - task ['+task.name+'] is holding')
this.holding[task.name] = task
}
if(this.started && this._runWatchdog == undefined){ this.start() }
//return task.promise
return task
}
/**
* Lookup task instance
* @param {string} taskName
* @returns {Tasker.Task}
*/
getTask(taskName){
return this.tasks[taskName]
}
/**
* Cancel task, returns the result of the Tasker.Task.cancel() function.
* @param {string} taskName
* @returns {Promise<any>}
*/
async cancelTask(taskName){
let task = await this.tasks[taskName]
return task.cancel()
}
}
module.exports = Runner