const Path = require('path')
const Joi = require('joi')
const Hoek = require('@hapi/hoek')
const Debug = require('debug')
const debug = Debug('dataparty.service.runner-node')
const EndpointContext = require('./endpoint-context')
const DeltaTime = require('../utils/delta-time')
const Router = require('origin-router').Router
const Runner = require('@dataparty/tasker').Runner
class ServiceRunnerNode {
/**
* Unsafe service runner. This service runner uses `eval` to run services, endpoints and tasks.
* This provides only simple context seperation and does not do effective context isolation.
* This should only be used where the service is knwon trustworthy. When the `useNative` option
* is set to true the service will run in the same context as this class with no isolation at all.
* @class module:Service.ServiceRunnerNode
* @link module:Service
* @param {module:Service.IService} options.service The service to load endpoints from
* @param {module:Party.IParty} options.party The party to pass to the endpoints
* @param {boolean} options.sendFullErrors If true send full stack traces to clients. Defaults to false
* @param {string} options.prefix A prefix to apply to all endpoint paths
* @param {Router} options.router Router, defaults to `origin-router`
* @param {boolean} options.useNative
*/
constructor({service, party, sendFullErrors=false, useNative=true, prefix='', router=new Router()}){
this.party = party
this.service = service
this.sendFullErrors = sendFullErrors
this.useNative = useNative
this.middleware = { pre: {}, post: {} }
this.endpoint = {}
this.tasks = {}
this.prefix=prefix
this.router = router
this.taskRunner = new Runner()
this.started = false
this.taskCounter = 0
}
async start(){
if(this.started){return}
debug('starting tasks')
this.started = true
const taskMap = Hoek.reach(this.service, 'compiled.tasks')
for(let name in taskMap){
debug('\t',name)
await this.loadTask(name)
}
debug('tasks ready:')
for(let name in this.tasks){
debug('\t', name)
}
await this.taskRunner.start()
debug('starting endpoints')
const eps = Hoek.reach(this.service, 'compiled.endpoints')
//const endpointsLoading = []
for(let name in eps){
debug('\t',name)
await this.loadEndpoint(name)
//endpointsLoading.push( this.loadEndpoint(name) )
}
//await Promise.all(endpointsLoading)
debug('endpoints ready:')
for(let name in this.endpoint){
debug('\t', Path.join('/', name))
}
}
assertTaskIsValid(name){
if(!this.tasks[name]){
throw new Error('invalid task ['+name+']')
}
}
async loadTask(name){
if(this.tasks[name]){
return
}
debug('loadTask', name, 'useNative =',this.useNative)
let dt = new DeltaTime().start()
"use strict"
let task=null
let TaskClass = null
if(!this.useNative){
const build = Hoek.reach(this.service, `compiled.tasks.${name}`)
TaskClass = eval(build.code/*, build.map*/)
}
else{
TaskClass = this.service.constructors.tasks[name]
}
task = new TaskClass({
context:{
party: this.party,
serviceRunner: this
}
})
debug('task info', TaskClass.info)
this.tasks[name] = task
if(TaskClass.Config.autostart){
this.taskRunner.addTask(task)
}
dt.end()
debug('loaded task',name,'in',dt.deltaMs,'ms')
}
async spawnTask(type, context){
this.assertTaskIsValid(type)
debug('spawnTask', type, 'useNative =',this.useNative)
let dt = new DeltaTime().start()
"use strict"
let task=null
let TaskClass = null
if(!this.useNative){
const build = Hoek.reach(this.service, `compiled.tasks.${type}`)
TaskClass = eval(build.code/*, build.map*/)
}
else{
TaskClass = this.service.constructors.tasks[type]
}
task = new TaskClass({
context:{
party: this.party,
serviceRunner: this,
...context
}
})
task.name = task.name + '-' + this.taskCounter++
debug('task info', TaskClass.info)
this.taskRunner.addTask(task)
dt.end()
debug('spawned task',task.name,'in',dt.deltaMs,'ms')
return task
}
/**
* Add a named task to the run queue
* @see https://github.com/datapartyjs/tasker
* @param {string} name
*/
runTask(name){
this.assertTaskIsValid(name)
const task = this.tasks[name]
this.taskRunner.addTask(task)
}
async loadEndpoint(name){
if(this.endpoint[name]){
return
}
debug('loadEndpoint', name, 'useNative =',this.useNative)
let dt = new DeltaTime().start()
"use strict"
let endpoint=null
if(!this.useNative){
const build = Hoek.reach(this.service, `compiled.endpoints.${name}`)
debug('build', build.code)
var self={}
eval(build.code, build.map)
endpoint = self.Lib
debug('obj Lib', self)
}
else{
endpoint = this.service.constructors.endpoints[name]
}
debug('endpoint', endpoint)
debug('endpoint info', endpoint.info)
await this.checkEndpointConfig(endpoint)
await this.loadEndpointMiddleware(endpoint, 'pre')
await this.loadEndpointMiddleware(endpoint, 'post')
await endpoint.start(this.party)
this.endpoint[name] = endpoint
const routablePath = Path.join(this.prefix, Path.normalize(name))
this.router.add(name, routablePath, this.endpointHandler(endpoint))
dt.end()
debug('loaded endpoint', routablePath,'in',dt.deltaMs,'ms')
}
async loadEndpointMiddleware(endpoint, type='pre'){
const middlewareList = Hoek.reach(endpoint, `info.MiddlewareConfig.${type}`)
for(let name in middlewareList){
const middleware = await this.loadMiddleware(name, type, endpoint)
const middlewareCfg = Hoek.reach(endpoint, `info.MiddlewareConfig.${type}.${name}`)
await this.checkMiddlewareConfig(middleware, middlewareCfg)
}
}
async loadMiddleware(name, type='pre'){
if(this.middleware[type][name]){
//debug('cached',type,'middleware',name)
return this.middleware[type][name]
}
debug('loadMiddleware', type, name, 'useNative =',this.useNative)
let dt = new DeltaTime().start()
const build = Hoek.reach(this.service, `compiled.middleware.${type}.${name}`)
if(this.useNative && !this.service.constructors.middleware[type][name]){
debug(`native middleware ${type} [${name}] does not exist`)
throw new Error(`native middleware ${type} [${name}] does not exist`)
}
if(!this.useNative && (!build || !build.code) ){
debug(`compiled middleware ${type} [${name}] does not exist`)
throw new Error(`compiled middleware ${type} [${name}] does not exist`)
}
let ret = async ()=>{
"use strict"
let middle=null
if(!this.useNative){
let self={}
eval(build.code, build.map)
middle = self.Lib
}
else{
middle = this.service.constructors.middleware[type][name]
}
//debug('middleware info', middle.info)
//await runner.getInfo()
//await runner.start(this.party)
await middle.start(this.party)
this.middleware[type][name] = middle
dt.end()
debug('loaded',type,'middleware',name,'in',dt.deltaMs,'ms')
return middle
}
return await ret()
}
async checkEndpointConfig(endpoint){
//! check basic structure {pre: Object, post: Object}
return await Joi.object().keys({
pre: Joi.object().keys(null),
post: Joi.object().keys(null)
})
.validateAsync(endpoint.info.MiddlewareConfig)
}
async checkMiddlewareConfig(middleware, middlewareCfg){
//! check endpoint configures middleware correctly
return await middleware.info.ConfigSchema.validateAsync(middlewareCfg)
}
/**
* Expressjs style way of calling an endpoint. The req will be passed to the router to select the appropritate endpoint
* @method module:Service.ServiceRunnerNode.onRequest
* @param {Express.Request} req
* @param {Express.Response} res
* @returns
*/
async onRequest(req, res){
debug('onRequest')
debug('req', req.method, req.hostname,'-', req.url, req.ips, req.body)
let route = await this.router.route(req, res)
debug('req done')
if(!route){
res.status(404).end()
return
}
}
endpointHandler(endpoint){
return async (event)=>{
debug('event',event.method, event.pathname, event.request.ip, event.request.ips)
const context = new EndpointContext({
req: event.request, res: event.response,
endpoint,
party: this.party,
input: event.request.body,
debug: Debug,
sendFullErrors: this.sendFullErrors
})
debug('running', endpoint.info.Name)
const middlewareCfg = Hoek.reach(endpoint, 'info.MiddlewareConfig')
let phase = 'pre-middleware'
try{
await this.runMiddleware(middlewareCfg, context, 'pre')
phase = 'endpoint'
const result = await endpoint.run(context, {Package: this.service.compiled.package})
phase = 'output'
context.setOutput(result)
phase = 'post-middleware'
await this.runMiddleware(middlewareCfg, context, 'post')
phase = 'send'
context.dt.end()
/*debug('ctx.log', context._debugContent)*/
debug('ran endpoint', endpoint.info.Name, 'in', context.dt.deltaMs, 'ms')
debug('result', context.output)
context.res.send(context.output)
}
catch(err){
if(this.sendFullErrors){
debug('caught error', err)
}
else{
debug('caught error (', err.message, ')')
}
context.dt.end()
debug('crashed (',endpoint.info.Name,') in', context.dt.deltaMs, 'ms')
context.res.status(500).send({
error: {
code: err.code,
message: err.message,
phase,
stack: (!context.sendFullErrors ? undefined : err.stack),
... (!context.sendFullErrors ? null : err)
}
})
}
}
}
async runMiddleware(middlewareCfg, ctx, type='pre'){
debug(`run ${type} middleware`)
const cfg = Hoek.reach(middlewareCfg, type)
const order = Hoek.reach(this.service, 'compiled.middleware_order.'+type)
debug('\tmiddleware order', order)
for(let name of order){
const info = Hoek.reach(cfg, name)
if(!info){ continue }
debug('\t\trunning', name)
const middleware = Hoek.reach(this.middleware, `${type}.${name}`)
const dt = new DeltaTime().start()
await middleware.run(ctx, {Config:info})
dt.end()
debug('runMiddleware(',type,name,') in', dt.deltaMs, 'ms')
}
}
}
module.exports = ServiceRunnerNode