comms_peer-comms.js

const debug = require('debug')('dataparty.comms.peercomms')
const uuidv4 = require('uuid/v4')
const HttpMocks = require('node-mocks-http')

const SocketOp = require('./op/socket-op')
const ISocketComms = require('./isocket-comms')

const Joi = require('joi')
const HostOp = require('./host/host-op')
const HostProtocolScheme = require('./host/host-protocol-scheme')

const AUTH_TIMEOUT_MS = 3000

const HOST_SESSION_STATES = {
  AUTH_REQUIRED: 'AUTH_REQUIRED',
  AUTHED: 'AUTHED',
  SERVER_CLOSED: 'SERVER_CLOSED',
  CLIENT_CLOSED: 'CLIENT_CLOSED'
}


function truncateString(str, num) {

  if(!str){return ''}
  
  if(typeof str != 'string'){
    str = str.toString()
  }

  let length = str.length

  if (str.length <= num) {
    return str
  }
  return str.slice(0, num) + '...' + (length-num) + 'more bytes'
}


/**
 * @class module:Comms.PeerComms
 * @implements {module:Comms.ISocketComms}
 * @extends {module:Comms.ISocketComms}
 * @link module:Comms
 * 
 * 
 * @param {boolean}     host    Set to `true` to make this peer act as the protocol host
 * @param {Object}      socket  Already connected peer socket
 */
class PeerComms extends ISocketComms {
  constructor({remoteIdentity, discoverRemoteIdentity, host, party, socket, ...options}){
    super({remoteIdentity, discoverRemoteIdentity, party, ...options})

    this.uuid = uuidv4()
    this.socket = socket || null

    this.host = host   //! Is comms host\
    this.oncall = null

    this._host_auth_timeout = null

    if(this.host){
      this.state = PeerComms.STATES.AUTH_REQUIRED
      this.session = undefined
      this.identity = undefined
      this.actor = undefined
    }

    this.pending_calls = 0
  }

  setState(state) {
    this.state = state
    this.emit('state', this.state)
  }

  static get STATES() {
    return HOST_SESSION_STATES
  }

  async handleClientCall(message){

    debug('handleClientCall - pending calls - ', this.pending_calls)

    this.pending_calls++
    try{

      let response = null
      let request = await this.decrypt( {data: message}, this.remoteIdentity )
      debug('handleHostCall', truncateString(request, 1024))

      let inputValidated

      if (this.state === PeerComms.STATES.AUTHED) {

        if(typeof request != 'object'){
          request = JSON.parse(request)
        }

        debug('handling authed call')
        inputValidated = HostProtocolScheme.ANY_OP.validate(request)
      } else if (this.state === PeerComms.STATES.AUTH_REQUIRED) {
        debug('handling non-authed call')
        inputValidated = HostProtocolScheme.AUTH_OP.validate(request)
      } else {
        throw new Error(
          'Recieved input in unexpected session state [',
          this.state,
          ']'
        )
      }

      if(inputValidated.error !== undefined){
        throw inputValidated.error
      }

      //debug('original input ->', typeof request, request)
      //debug('validated input ->', inputValidated)
      const op = new HostOp({ msg: message, input: inputValidated.value })

      /*debug('session id : ', op.input.session, this.session)

      if (this.session && op.input.session === this.session.id) {
        debug('session id MATCH')
      }*/

      op.once('finished', state => {
        const response = {
          op: 'status',
          id: op.id,
          level: op.level,
          state: op.state,
          stats: {
            start: op.start,
            end: op.end,
            duration_ms: op.end - op.start
          },
          ...op.result 
        }

        debug('finished', response.id, response.state, response.stats.duration_ms, 'ms')

        this.send(response)
      })


      
      await this.authorizeOperation(op)

    } catch (err) {
      debug('EXCEPTION ->', err)
    }
    this.pending_calls--
  }

  

  async handleClientConnection(){
    debug('handleClientConnection')

    this._host_auth_timeout = setTimeout(
      this.handleAuthTimeout.bind(this),
      AUTH_TIMEOUT_MS
    )
  }



  async handleAuthTimeout(){
    clearTimeout(this._host_auth_timeout)
    this._host_auth_timeout = null
    if(!this.authed){
      debug('handleAuthTimeout - timed out')
      this.authed = false
      await this.stop()
    }
  }

  async handleMessage(message){
    debug('handleMessage', truncateString(message.toString(), 1024) )

    this.onmessage({data: message})
  }

  async call(path, data, force=false){
    if(this.host && !this.force){ throw new Error('host-not-allowed-call') }
    if(!this.authed){ throw new Error('not authed') }

    if (!this.party.hasIdentity()) {
      throw new Error('identity required')
    }

    let callOp = new SocketOp( 'peer-call', { endpoint: path, data }, this )

    debug('running peer-call endpoint =', path, truncateString(data, 1024))

    const reply = await callOp.run()

    return reply.result
  }

  async start(){
    debug('start')
    if(this.socketInit){
      await this.socketInit()
    }
    
    this.socket.on('close', this.stop.bind(this))

    if(this.host){
      debug('host mode comms')

      this.socket.once('connect', this.handleClientConnection.bind(this))
      this.socket.on('data', this.handleClientCall.bind(this))
    }
    else{
      debug('client mode comms')
      this.socket.once('connect', this.onopen.bind(this))
      this.socket.on('data', this.handleMessage.bind(this))
    }

    if(this.socketStart){
      await this.socketStart()
    }
  }

  async stop(){
    debug('stop')
    this.close()
  }

  async close(){
    debug('close', this.uuid)

    if(this.party.topics){
      await this.party.topics.destroyNode(this)
    }

    debug('closing connection')
    this.socket.destroy()

    this.onclose()
  }


  async authorizeOperation(op) {

    //debug('Here\'s op', op)
    
    //debug('state : ', this.state)

    //console.log(op.input)

    if (op.op === 'auth' && this.state === PeerComms.STATES.AUTH_REQUIRED) {
    
      debug('handling auth op')
      return this.handleAuthOp(op)
    
    } else if (op.op === 'peer-call' && this.state === PeerComms.STATES.AUTHED) {

      return this.handleCallOp(op)

    } else if (op.op === 'advertise' && this.state === PeerComms.STATES.AUTHED) {

      if(this.party.topics){
        await this.party.topics.advertise(this, op.input.topic)
        op.setState(HostOp.STATES.Finished_Success)
      }
      else{
        op.setState(HostOp.STATES.Finished_Fail)
      }

    } else if (op.op === 'subscribe' && this.state === PeerComms.STATES.AUTHED) {

      if(this.party.topics){
        await  this.party.topics.subscribe.bind(this.party.topics)(this, op.input.topic)
        op.setState(HostOp.STATES.Finished_Success)
      }
      else{
        op.setState(HostOp.STATES.Finished_Fail)
      }

    } else if (op.op === 'unsubscribe' && this.state === PeerComms.STATES.AUTHED) {

      if(this.party.topics){
        await this.party.topics.unsubscribe(this, op.input.topic)
        op.setState(HostOp.STATES.Finished_Success)
      }
      else{
        op.setState(HostOp.STATES.Finished_Fail)
      }

    } else if (op.op === 'publish' && this.state === PeerComms.STATES.AUTHED) {

      if(this.party.topics){
        await this.party.topics.publish(this, op.input.topic, op.input.msg)
        op.setState(HostOp.STATES.Finished_Success)
      }
      else{
        op.setState(HostOp.STATES.Finished_Fail)
      }

    } else {
      debug('⚠️ op not implemented ⚠️')
      debug(op.input)

      op.result='not implemented'
      op.setState(HostOp.STATES.Finished_Fail)
    }
  }

  async handleAuthOp(op){
    
    debug('allowing client - ', this.remoteIdentity)

    clearTimeout(this._host_auth_timeout)
    this._host_auth_timeout = null

    this.authed = true
    this.setState(PeerComms.STATES.AUTHED)
    op.setState(HostOp.STATES.Finished_Success)

    this.emit('open')
    return 
  }

  async handleCallOp(op){
    debug('peer-call', op.input.endpoint)

    if(this.party.hostRunner){

      debug('calling runner')

      if(op.input.endpoint == 'api-v2-peer-bouncer'){
        debug('ask->', truncateString(op.input.data, 1024))
        op.result = {result: await this.party.handleCall(op.input.data) }

        op.setState(HostOp.STATES.Finished_Success)
        return
      }

      const req = HttpMocks.createRequest({
        method: 'GET',
        url: '/'+op.input.endpoint,
        body: (op.input.data) ? JSON.parse(op.msg.toString()) : undefined
      })

      const res = HttpMocks.createResponse()

      debug('\tthe request', req)

      debug('req ip type', typeof req.ip)

      const route = this.party.hostRunner.router.get(op.input.endpoint)

      debug('route',route)

      debug('call route', await route._events.route({
        method: req.method,
        pathname: req.url,
        request: req,
        response: res
      }))

      op.result = {result: res._getData() }

      debug('got result', op.result)

      op.setState(HostOp.STATES.Finished_Success)
      return

    } else if(op.input.endpoint == 'api-v2-peer-bouncer'){
      
      debug('ask->',op.input.data)
      op.result = {result: await this.party.handleCall(op.input.data) }

      op.setState(HostOp.STATES.Finished_Success)

      return
    }
  }
}


module.exports = PeerComms