
let topics = {};

export const subscribe = (topic, listener, swid = null, priority = null) => {
  //instantiate topic if it doesn't exist
  if (!topics[topic]) {
    topics[topic] = { queue: [] };
  }

  if (swid) {
    topics[topic].queue = topics[topic].queue.filter(x => x.swid !== swid);
  }

  let index = topics[topic].queue.push({ func: listener, interrupt: priority, okInSession: 0, swid: swid }) - 1;

  //return remove function to allow subscriber to stop listening
  return ((topic, index) => {
    return {
      remove: () => {
        topics[topic].queue.splice(index);
      }
    }
  }).bind(this, topic, index)
}

export const publish = (topic, arg = {}, callback = null, session = null) => {
  if (!topics[topic] || topics[topic].queue.length === 0) {
    if (callback) {
      callback(arg);
    }
    return;
  }

  if (session === null) {
    session = performance.now();
  }

  //call subs async
  window.setTimeout(() => {
    let items = topics[topic].queue;
    let interrupts = items.filter((x) => { return x.interrupt !== null && x.okInSession !== session; });
    let nonInterrupts = items.filter((x) => { return x.interrupt === null; });

    if (interrupts.length > 0) {
      interrupts.orderBy('interrupt').reverse();
    }

    let cancelPublish = false;
    for (let i = 0; !cancelPublish && i < interrupts.length; i++) {
      //call subscriber function and pass argument
      interrupts[i].func(arg, (retry, newArg = null) => {
        if (retry) {
          interrupts[i].okInSession = session;
          this.publish(topic, newArg ? newArg : arg, callback, session);
        } else {
          cancelPublish = true;
        }
      }, topic);
    }

    if (!cancelPublish) {
      for (let i = 0; i < nonInterrupts.length; i++) {
        //call subscriber function and pass argument
        nonInterrupts[i].func(arg);
      }
    }

    if (callback != null && !cancelPublish) {
      callback(arg);
    }
  }, 0);
}
