topics_local-topic-host.js

'use strict'

const Path = require('path')
const debug = require('debug')('dataparty.topics.LocalTopicHost')

const HostTopic = require('./host-topic')
const PeerNode = require('./peer-node')

class LocalTopicHost {

  /**
   * Implementation of ROS style pub/sub topics. This runs on a
   * single thread/peer and can be shared to other peers. However,
   * this is a centralized implementation. So if the hosting node
   * stops all topic traffic will halt as well.
   * @class module:Topics.LocalTopicHost
   * @link module:Topics
   */
  constructor(){
    debug('constructor')
    this.nodesByUuid = new Map()
    this.topicsByPath = new Map()
  }

  getTopic(path, create=true){
    const normalized = Path.normalize(path)
    let topic = this.topicsByPath.get(normalized)

    //debug('get topic', topic, normalized)

    if(!topic && create == true){
      //topic = 
      this.topicsByPath.set(normalized, new HostTopic(normalized))

      //debug('set topic', this.topicsByPath.get(normalized), normalized)
      return this.topicsByPath.get(normalized)
    }

    return topic
  }

  getNodeByUuid(uuid, peer){
    let node = this.nodesByUuid.get(uuid)

    if(!node && peer){
      node = new PeerNode(peer)
      this.nodesByUuid.set(uuid, node)
    }

    return node
  }

  getNodeByPeer(peer){
    return this.getNodeByUuid(peer.uuid, peer)
  }

  async advertise(peer, path){
    const topic = this.getTopic(path)
    const node = this.getNodeByPeer(peer)

    debug('advertise', path, peer.uuid)

    topic.advertise(node)
    node.advertise(topic)
  }

  async subscribe(peer, path){
    debug('sub', path)
    const exists = this.getTopic(path, false)
    const topic = this.getTopic(path)
    const node = this.getNodeByPeer(peer)

    debug('subscribe', path, peer.uuid)

    topic.subscribe(node)
    node.subscribe(topic)

    if(topic.path.indexOf('/dataparty/document/') != -1 && !exists){
      const [arg0, arg1, docType, docId] = topic.path.substr(1).split('/')
      debug('is document watcher', docType+':'+docId)

      peer.party.hostParty.db.on(docType+':'+docId, async (event)=>{
        await this.handleDocChange(topic.path, event)
      })
    }
  }

  async handleDocChange(path, event){
    debug('handleDocChange', path)
    const topic = this.getTopic(path,false)

    debug('\ttopic',topic)

    if(!topic){return}

    const [arg0, arg1, docType, docId] = topic.path.substr(1).split('/')

    if(topic.subscribers.size > 0){
      await topic.publish({
        id: event.msg.id,
        type: event.msg.type,
        revision: event.msg.revision,
        operationType: event.event
      })
    }
  }

  async unadvertise(peer, path){
    const topic = this.getTopic(path)
    const node = this.getNodeByPeer(peer)

    debug('unadvertise', path, peer.uuid)

    topic.unadvertise(node)
    node.unadvertise(topic)
  }

  async unsubscribe(peer, path){
    const topic = this.getTopic(path)
    const node = this.getNodeByPeer(peer)

    debug('unsubscribe', path, peer.uuid)

    topic.unsubscribe(node)
    node.unsubscribe(topic)
  }

  async publish(peer, path, data){
    const topic = this.getTopic(path, false)
    const sender = this.getNodeByPeer(peer)

    debug('publish', path, peer.uuid)

    await topic.publish(data, sender)
  }

  async publishInternal(path, data){
    const topic = this.getTopic(path, false)

    debug('publishInternal', path)

    await topic.publish(data)
  }

  async destroyNode(peer){
    const node = this.getNodeByPeer(peer)

    debug('destroying node', node.uuid)

    node.destroy()

    this.nodesByUuid.delete(node.uuid)
  }

  async cleanUpTopics(){

  }
}

module.exports = LocalTopicHost