Skip to content

Source: lib/util/publisher.js

/**
 * The publisher module. Provides pub/sub functionality with extensive wildcard support and also content based filtering.
 * @module lib/util/publisher
 * @author Frank Kudermann - alphanull
 * @version 1.5.1
 * @license MIT
 */
export default {
    configure,
    publish,
    subscribe,
    unsubscribe,
    removePersistentMessage
};

/* INTERNAL PROPERTIES */

/**
 * Global options for the publisher, can be changed by the setOptions method.
 * @private
 * @memberof module:lib/util/publisher
 * @type {module:lib/util/publisher~globalOptions}
 */
const globalOptions = {
    async: true,
    handleExceptions: false,
    lenientUnsubscribe: false
};

/**
 * Internal value to create unique tokens. This is increased everytime you subscribe.
 * @private
 * @memberof module:lib/util/publisher
 * @type {number}
 */
let tokenCounter = -1;

/**
 * Object which contains all subscribers, using the token as key.
 * @private
 * @memberof module:lib/util/publisher
 * @type     {Map<module:lib/util/publisher~subscriberObject>}
 * @property {module:lib/util/publisher~subscriberObject}      subscriberObject  The subscriber object containing all necessary information.
 */
const subscrs = new Map();

/**
 * Object which contains a tree graph of all subscriptions.
 * @private
 * @memberof module:lib/util/publisher
 * @type {Map<Object>}
 */
const subsTree = new Map();

/**
 * Object which contains persistent messages (published with the "persist" option).
 * @private
 * @memberof module:lib/util/publisher
 * @type {Map<Object>}
 */
const persistentMessages = new Map();

/**
 * Executes a function asynchronously, using the most performant available method.
 * Falls back to setTimeout if no microtask queue is available.
 * @private
 * @memberof module:lib/util/publisher
 * @param {Function} fn  The function to execute.
 */
function deferExecution(fn) {
    if (typeof queueMicrotask === 'function') queueMicrotask(fn);
    else if (typeof Promise === 'function') Promise.resolve().then(fn);
    else setTimeout(fn, 0);
}

/**
 * Checks if the given value is undefied.
 * @param   {*}       obj  The value to check.
 * @returns {boolean}      True if the value is undefined, false otherwise.
 */
function isUndefined(obj) {
    return typeof obj === 'undefined';
}

/* PUBLIC API */

/**
 * Sets global options for the publisher. All subsequent actions use these options.
 * @memberof module:lib/util/publisher
 * @param {Object<module:lib/util/publisher~globalOptions>} options  An object containing various options.
 */
export function configure({ async, handleExceptions, lenientUnsubscribe } = {}) {

    if (!isUndefined(async)) globalOptions.async = async;
    if (!isUndefined(handleExceptions)) globalOptions.handleExceptions = handleExceptions;
    if (!isUndefined(lenientUnsubscribe)) globalOptions.lenientUnsubscribe = lenientUnsubscribe;

}

/**
 * Publishes a message to all matching subscribers.
 * @memberof module:lib/util/publisher
 * @param   {string}  topic                       The topic of this message, may be separated with '/' for subtopics.
 * @param   {Object}  [data]                      The data that should be sent along with the event. Can be basically any javascript object.
 * @param   {Object}  [options={}]                Additional options.
 * @param   {boolean} [options.async]             Specify if we should deliver this  message directly or with a timeout. Overrides the global setting.
 * @param   {boolean} [options.handleExceptions]  Specify if we should catch any exceptions while sending this message. Overrides the global setting.
 * @param   {boolean} [options.persist]           If this is set to true, the messages is saved for later subscribers which want to be notified of persistent messages.
 * @param   {boolean} [options.cancelable]        If set to "true" this message cannot be cancelled (when sending synchronously).
 * @returns {boolean}                             Returns false if a synchronous event was cancelled by a handler.
 * @throws  {Error}                               When trying to use a wildcard for publishing.
 */
export function publish(topic, data, options = {}) {

    // check if this message is persistent and must be saved for later use
    if (options.persist === true) {
        persistentMessages.set(topic, {
            data,
            options
        });
    }

    if (topic.indexOf('*') > -1) throw new Error('Publish topic cannot contain any wildcards.');

    const matchedSubscribers = findSubscribers(topic.split('/'), data, options, subsTree, topic);

    // sort handlers by priority
    matchedSubscribers.sort((a, b) => {
        if (a.priority === b.priority) { return a.position - b.position; } // emulate stable sort
        if (a.priority > b.priority) { return -1; }
        return 1;
    });

    // finally, execute them
    let subscriber;

    const async = isUndefined(options.async) ? globalOptions.async : options.async;

    while ((subscriber = matchedSubscribers.shift())) {

        if (async) {
            deferExecution(executeHandler.bind(null, subscriber, topic, data, options));
        } else if (executeHandler(subscriber, topic, data, options) === false && options.cancelable !== false) {
            return false;
        }

    }

    return true;

}

/**
 * Subscribes to certain message(s).
 * @memberof module:lib/util/publisher
 * @param   {string}   topic                       The topic in which the subscriber is interested. Note that you can use wildcards, ie. The topic "*" will subscribe to all messages.
 * @param   {Function} handler                     The handler to execute when a matching message is found.
 * @param   {Object}   [options={}]                Additional options.
 * @param   {boolean}  [options.async]             Specify if we should deliver this  message directly or with a timeout. Overrides the global setting.
 * @param   {boolean}  [options.handleExceptions]  Specify if we should catch any exceptions while sending this message. Overrides the global setting.
 * @param   {boolean}  [options.persist]           If this is set to true, the subscriber is notified of any former, persistent messages.
 * @param   {Function} [options.condition]         A function which receives this topic and data just before execution, if present. If this returns anything but true, the message is not delivered.
 * @param   {number}   [options.priority=0]        Specifies with which priority the handler should be executed. The higher the number, the higher the priority. Default is "0", negative values are allowed.
 * @param   {number}   [options.invocations]       Specifies how many times the subscriptions should be executed after a matching event. If this value reaches "0", the handler is automatically unsubscribed.
 * @returns {number}                               The internal token of the new subscriber. Can be used for later unsubscribing.
 * @throws  {Error}                                If topic or handler are undefined.
 */
export function subscribe(topic, handler, options = {}) {

    const token = tokenCounter += 1;

    if (isUndefined(topic)) {
        throw new Error('Subscribe failed - undefined Topic.');
    }

    if (topic.includes('undefined')) {
        throw new Error(`Subscribe for '${topic}' failed - found 'undefined' in topic, this is almost always an error: ${token}`);
    } // $$$ TEMP

    if (isUndefined(handler)) {
        throw new Error(`Subscribe for '${topic}' failed - undefined Handler`);
    } // $$$ TEMP

    // Add to Subscribers Object
    const subscriber = {
        token,
        topic,
        handler,
        options
    };

    subscrs.set(token, subscriber);
    addSubscription(topic.split('/'), subscriber, subsTree); // build Graph

    if (options.persist !== true) return token; // return the token (as Number)

    const regex = new RegExp(`^${topic.replace('*', '(.+)')}(/.+)?$`);

    // check persistent messages for matching topic
    for (const [key, persistentMessage] of persistentMessages) {

        const match = key.match(regex);

        if (match) {

            const async = isUndefined(persistentMessage.options.async) ? globalOptions.async : persistentMessage.options.async;

            if (async) {
                deferExecution(executeHandler.bind(null, subscriber, key, persistentMessage.data, options));
            } else if (executeHandler(subscriber, key, persistentMessage.data, options) === false && options.cancelable === true) {
                break;
            }
        }
    }

    return token; // return the token (as Number)

}

/**
 * Unsubscribes one or more subscribers. Note that here, the second argument can mean either a handler or the "lenient" option.
 * @memberof module:lib/util/publisher
 * @param  {number|number[]|string} topicOrToken          The token or the topic to unsubscribe. In the first case, these also can be in an Array to support multiple unsubscriptions.
 * @param  {Function|boolean}       [handler]             If specified, the message is only unsubscribed if the handler also matches.
 * @param  {boolean}                [lenientUnsubscribe]  If set to true, unsubscribe won't throw an error if the handler or token is not found.
 * @throws {Error}                                        If no subscribers were found.
 */
export function unsubscribe(topicOrToken, handler, lenientUnsubscribe) {

    const lenientArg = handler === Boolean(handler) ? handler : lenientUnsubscribe,
          lenient = isUndefined(lenientArg) ? globalOptions.lenientUnsubscribe : lenientArg;

    const unsubscribeToken = token => {

        const subscriber = subscrs.get(token);

        if (isUndefined(subscriber)) {
            if (lenient === true) return;
            throw new Error(`Unsubscribe failed. Did not find subscriber for token: ${token}`);
        }

        removeSubscription(subscriber.topic.split('/'), subscriber, subsTree);
        subscrs.delete(token);

    };

    if (isUndefined(topicOrToken)) {
        if (lenient === true) return;
        throw new Error('Unsubscribe failed. No Arguments specified.');
    }

    // check for unsubscribe type
    if (Array.isArray(topicOrToken)) { // array of tokens, so iterate over it

        topicOrToken.forEach(topic => unsubscribeToken(topic));

    } else if (!isNaN(parseFloat(topicOrToken)) && isFinite(topicOrToken)) { // it's a Number, so it is a single token

        unsubscribeToken(topicOrToken);

    } else {

        // assume topic & handler based unsubscribe
        // check for existing handler first

        if (isUndefined(handler)) {
            if (lenient === true) return;
            throw new Error(`Unsubscribe failed. No handler for topic based unsubscribe specified ${topicOrToken}`);
        }

        // TODO: not really efficient,
        // consider walking the topic tree instead
        for (const [, subscriber] of subscrs) {
            if (subscriber.handler === handler && subscriber.topic === topicOrToken) {
                removeSubscription(topicOrToken.split('/'), subscriber, subsTree);
                subscrs.delete(subscriber.token);
                break;
            }
        }

    }

}

/**
 * Removes a previously added persistent message.
 * @memberof module:lib/util/publisher
 * @param {string} topic  The topic of the message to remove.
 */
export function removePersistentMessage(topic) {

    persistentMessages.delete(topic);

}

/* PRIVATE METHODS */

/**
 * This method actually executes the message handler.
 * @private
 * @memberof module:lib/util/publisher
 * @param   {module:lib/util/publisher~subscriberObject} subscriber    The subscriber to execute.
 * @param   {string}                                     topic         The message topic.
 * @param   {Object}                                     data          The message data which should be passed to the handler.
 * @param   {Object}                                     [options={}]  Object which holds the publish options.
 * @returns {any}                                                      Returns the handlers result.
 */
function executeHandler(subscriber, topic, data, options = {}) {

    // prevent async handler to be called when subscriber has been removed meanwhile
    if (!subscrs.has(subscriber.token)) { return; }

    if (subscriber.options.invocations > 0) {
        subscriber.options.invocations -= 1;
        if (subscriber.options.invocations < 1) unsubscribe(subscriber.token);
    }

    const { handler } = subscriber;

    if (options.handleExceptions !== true && globalOptions.handleExceptions !== true) {
        return subscriber.options.topicArg === true ? handler(topic, data) : handler(data, topic);
    }

    try {
        return subscriber.options.topicArg === true ? handler(topic, data) : handler(data, topic);
    } catch (e) {
        if (window.console && window.console.error) {
            window.console.error('Exception while executing publish handler: ', e);
        }
    }

}

/**
 * Internal Function to recursively walk the subscription graph according to the current topic scope. Any found subscribers are added to the return array.
 * @private
 * @memberof module:lib/util/publisher
 * @param   {string[]}                                     topicArray     Hold eachs segments of the topic in an array, also reflecting the current scope.
 * @param   {Object}                                       data           The current data which was sent along the message.
 * @param   {Object}                                       [options={}]   Current publishing options.
 * @param   {Object}                                       subscriptions  Part of the subscriptions tree, reflecting the current scope.
 * @param   {string}                                       originalTopic  Original message topic.
 * @param   {Array}                                        handlerArray   array with all found handlers.
 * @returns {module:lib/util/publisher~subscriberObject[]}                Array with matching subscribers.
 */
function findSubscribers(topicArray, data, options = {}, subscriptions, originalTopic, handlerArray = []) { // eslint-disable-line max-params

    const subscribers = subscriptions.get('subscribers') || new Map(),
          topics = subscriptions.get('topics');

    for (const [, subscriber] of subscribers) {

        const { condition, topicArg } = subscriber.options; // condition for content based publishing

        if (isUndefined(condition) || (topicArg === true ? condition(originalTopic, data) === true : condition(data, originalTopic) === true)) { // TODO: Refactor
            // last but not least, check if there are any additional conditions set
            subscriber.position = handlerArray.push(subscriber);
            subscriber.priority = subscriber.options.priority || 0;
            subscriber.async = Boolean(options.async || subscriber.options.async || globalOptions.async);
        }

    }

    if (topicArray.length && topics) {

        const subscription = topics.get(topicArray[0]),
              subscriptionWild = topics.get('*');

        if (!isUndefined(subscriptionWild) || !isUndefined(subscription)) {

            // if there are further subscriptions on this node, recurse
            if (!isUndefined(subscriptionWild)) {
                findSubscribers(topicArray.slice(1, topicArray.length), data, options, subscriptionWild, originalTopic, handlerArray);
            }

            if (!isUndefined(subscription)) {
                findSubscribers(topicArray.slice(1, topicArray.length), data, options, subscription, originalTopic, handlerArray);
            }

            topicArray.shift();

        }

    }

    return handlerArray;

}

/**
 * Internal function to add a subscription to the subscriptions graph.
 * @private
 * @memberof module:lib/util/publisher
 * @param {string[]}                                           topicArray     Hold eachs segments of the topic in an array, also reflecting the current scope.
 * @param {Object<module:lib/util/publisher~subscriberObject>} subscriber     The subscriber to add.
 * @param {Object}                                             subscriptions  Part of the subscription graph, reflecting the current scope.
 */
function addSubscription(topicArray, subscriber, subscriptions) {

    const [topic] = topicArray;

    if (isUndefined(subscriptions.get('topics'))) {
        subscriptions.set('topics', new Map());
    }

    const subTopic = subscriptions.get('topics');

    let subscription = subTopic.get(topic);

    if (isUndefined(subscription)) {
        subscription = new Map();
        subTopic.set(topic, subscription);
    }

    if (topicArray.length < 2) {

        // last segment, add subscriber
        if (isUndefined(subscription.get('subscribers'))) {
            subscription.set('subscribers', new Map());
        }

        subscription.get('subscribers').set(subscriber.token, subscriber);

    } else {

        // recurse
        topicArray.shift();
        addSubscription(topicArray, subscriber, subscription);

    }

}

/**
 * Internal Function to recursively walk the subscription graph according to the current topic scope.
 * @private
 * @memberof module:lib/util/publisher
 * @param {string[]}                                           topicArray     Hold eachs segments of the topic in an array, also reflecting the current scope.
 * @param {Object<module:lib/util/publisher~subscriberObject>} subscriber     The subscriber to remove.
 * @param {Object}                                             subscriptions  Part of the subscriptions tree, reflecting the current scope.
 */
function removeSubscription(topicArray, subscriber, subscriptions) {

    const [topic] = topicArray,
          topics = subscriptions.get('topics'),
          subTopic = topics.get(topic),
          subscribers = subTopic.get('subscribers');

    if (topicArray.length < 2) {

        // last segment, remove subscriber
        subscribers.delete(subscriber.token);

        if (subscribers.size === 0) subTopic.delete('subscribers');

    } else {
        // recurse
        topicArray.shift();
        removeSubscription(topicArray, subscriber, subTopic);
    }

    // cleanup any empty graph elements
    if (subTopic.has('topics') && subTopic.get('topics').size === 0) subTopic.delete('topics');
    if (topics.get(topic).size === 0) topics.delete(topic);

}

/* TYPE DEFINITIONS */

/**
 * @typedef  {Object} module:lib/util/publisher~globalOptions    Options for the publisher. These can be changed on a global basis by the configure() method.
 * @property {boolean} [async=true]                If set to "false", messages are sent directly, otherwise with a timeout.
 * @property {boolean} [handleExceptions=false]    If this is true, any exceptions are catched, so that a faulty handler cannot disturb further publishing.
 * @property {boolean} [lenientUnsubscribe=false]  If this is true, unsubsribing a non-existant subscription or handler will not throw an error.
 */

/**
 * @typedef  {Object} module:lib/util/publisher~subscriberObject Structure of a single Subscriber
 * @property {string}                                              token      The token of this subscriber.
 * @property {string}                                              topic      The topic of this subscriber.
 * @property {Function}                                            handler    The handler to execute when a mathing publish event occurs.
 * @property {number}                                              timeOutId  The ID of the timeout when using async publish. Used to clean up when unsubscribe occurs and the handler is still waiting.
 * @property {Object<module:lib/util/publisher~subscriberOptions>} options    The various subscriber options.
 */

/**
 * @typedef  {Object} module:lib/util/publisher~subscriberOptions    Options for the subscriber. These can be used by publish or subscribe commands, except when noted.
 * @property {boolean}  [async]             If set to "false", messages are sent directly to this subscriber, otherwise with a timeout.
 * @property {boolean}  [handleExceptions]  If this is true, any exceptions are catched, so that a faulty handler cannot disturb further publishing.
 * @property {Function} [condition]         A function to be evaluated before a matching subscriber is executed. Must return true to continue.
 * @property {boolean}  [persist]           If this is set to true, the messages is saved for later subscribers which want to be notified of persistent messages.
 * @property {number}   [priority]          Subscribe only: specifies with which priority the handler should be executed. The higher the number, the higher the priority. Default is "0", negative values are allowed.
 */