// universalPusher.js
'use strict';
import { LRUCache } from 'lru-cache';

import { CONFIG_PRIORITY_PRESENCE_CLIENT } from '../resource/configPriority.js';
import {
  bindEncryptedEvents,
  bindSubscriptionEvents,
} from './pusherController.js';
import {
  STREAM_ONLINE,
  STREAM_OFFLINE,
  STREAM_VIEWERS_UPDATED,
  CONFIGURATION_UPDATED,
  GOAL_ADDED,
  GOAL_ENDED,
  GOAL_STARTED,
  COUNTER_ADDED,
  GOAL_PROGRESS_UPDATED,
  DEVICE_ONLINE,
  DEVICE_OFFLINE,
} from './PusherEvents.js';
import { pusher as pusherDebug } from '../resource/debug.js';
import { addSecretKey } from './pusherKeychain.js';

const debugLog = pusherDebug.extend('log');
const debugErrorLog = pusherDebug.extend('error');

const canUnsubscribeChannelPrefixes = ['private-enc-user@'];
const channelEventBinders = [
  {
    regex: new RegExp('^presence-enc-client@([0-9a-z-]{36})$'),
    binder: ({ channel }) => {
      bindEncryptedEvents(channel, event =>
        event === CONFIGURATION_UPDATED
          ? { priority: CONFIG_PRIORITY_PRESENCE_CLIENT }
          : {}
      );
    },
  },
  {
    regex: new RegExp('^private-enc-user@([0-9a-z]{24})$'),
    binder: ({ matchResult: [, userId] = [], channel }) => {
      bindEncryptedEvents(channel, event =>
        [
          COUNTER_ADDED,
          STREAM_ONLINE,
          STREAM_OFFLINE,
          STREAM_VIEWERS_UPDATED,
          GOAL_ADDED,
          GOAL_ENDED,
          GOAL_STARTED,
          GOAL_PROGRESS_UPDATED,
          DEVICE_ONLINE,
          DEVICE_OFFLINE,
        ].includes(event)
          ? { userId, streamId: userId }
          : { userId }
      );
    },
  },
  {
    regex: new RegExp('^private-enc-stream@([0-9a-z]{24})$'),
    binder: ({ matchResult: [, streamId] = [], channel }) => {
      bindEncryptedEvents(channel, { streamId });
    },
  },
];
const subscribeChannel = ({ channelName, pusherInstance }) => {
  const log = debugLog.extend('subscribeChannel');
  const errorLog = debugErrorLog.extend('subscribeChannel');
  log('start', { channelName, pusherInstance });

  if (!pusherInstance) {
    errorLog('missing pusherInstance', { channelName, pusherInstance });
    throw new Error('missing pusherInstance');
  }

  let matchResult = null;
  const binder = channelEventBinders.find(binder => {
    return !!(matchResult = channelName.match(binder.regex));
  })?.binder;
  if (binder) {
    log('subscribing', { channelName, pusherInstance });
    const channel = pusherInstance.subscribe(channelName);
    bindSubscriptionEvents(channel);
    binder?.({ matchResult, channel });
  } else {
    errorLog('missing binder', { channelName });
  }
};

/**
 * Process pusher subscription target channels from each browser tabs.
 * It will process targets with these steps:
 *   * Obtain targets from each tabs and merge them (getMergedPusherTargetsFromTabs())
 *   * Set merged data to each tabs for debug (scope.workerMessenger.set())
 *   * Compare targets with pusherInstance to compute channel differences (getChannelDiffs())
 *   * Unsubscribe extra channels in pusherInstance
 *   * Get stale channels, which means channels need to re-subscribe (getStaleChannels())
 *   * Compute next batch channels from missing channels, stale channels and
 *     subscribe records for backoff (getNextBatchChannels())
 *   * Fetch authorize data for next batch channels and store in buffer (subscribeChannel)
 * @param {array} {[tabIds=[]]} - tab ids of same domain
 * @param {Object} {[scope={}]} - scope object to read/write data
 * @param {Object} {pusherInstance} - using pusher instance
 */
export const processPusherTargets = async function ({
  tabIds = [],
  scope = {},
  pusherInstance,
}) {
  const log = debugLog.extend('processPusherTargets');
  const errorLog = debugErrorLog.extend('processPusherTargets');
  log('start', { self: self, scope, pusherInstance });

  if (!pusherInstance) {
    errorLog('missing pusherInstance', { tabIds, scope, pusherInstance });
    throw new Error('missing pusherInstance');
  }

  scope.pusher = scope.pusher || {};
  scope.pusher.record = scope.pusher.record || {};

  const mergedPusherTargets = await getMergedPusherTargetsFromTabs({
    workerMessenger: scope.workerMessenger,
    tabIds,
  });
  log('getMergedPusherTargetsFromTabs()', {
    mergedPusherTargets,
    scope,
    tabIds,
  });

  // update mergedPusherTargets to scopes on main threads
  Promise.allSettled(
    tabIds.map(tabId => {
      return scope.workerMessenger.set({
        selectPath: ['pusher', 'mergedTargets'],
        value: mergedPusherTargets,
        options: { objectDepth: 2, tabId },
      });
    })
  );

  const { extras: extraChannels, missings: missingChannels } = getChannelDiffs({
    targets: mergedPusherTargets,
    pusherInstance,
  });
  log('getChannelDiffs()', {
    extraChannels,
    missingChannels,
    mergedPusherTargets,
  });

  // unsubscribe extra channels
  extraChannels
    .filter(channel =>
      canUnsubscribeChannelPrefixes.some(prefix =>
        channel.name.startsWith(prefix)
      )
    )
    .forEach(targetChannel => {
      const channel = pusherInstance.channels.channels[targetChannel.name];
      channel?.unsubscribe();
      channel?.disconnect();
      delete pusherInstance.channels.channels[targetChannel.name];
    });

  const staleChannels = getStaleChannels({
    targets: mergedPusherTargets,
    record: scope.pusher.record,
    pusherInstance,
  });
  log('getStaleChannels()', {
    staleChannels,
    mergedPusherTargets,
    record: scope.pusher.record,
    pusherInstance,
  });

  // handle missing and stale channels
  const now = Date.now();
  const nextBatchChannels = getNextBatchChannels({
    record: scope.pusher.record,
    missingChannels,
    staleChannels,
    now,
  });
  log('getNextBatchChannels()', {
    nextBatchChannels,
    missingChannels,
    record: JSON.parse(JSON.stringify(scope.pusher.record)),
    now,
  });

  // increase retriedCount and raise subscribing flags
  nextBatchChannels.forEach(channel => {
    const record = scope.pusher.record;
    scope.pusher.record = Object.assign(record, {
      [channel.channelName]: {
        ...record[channel.channelName],
        retriedCount: channel.retriedCount + 1,
        isSubscribing: true,
      },
    });
  });

  // fetch auth data, we fetch auth by ourself to utlize better retry control
  const {
    authEndpoint: endpoint,
    appId,
    language,
    headers,
  } = scope.pusher.config || {};
  const fetchBatchAuthenticateParam = {
    channels: nextBatchChannels,
    endpoint,
    appId,
    socketId: pusherInstance.connection.socket_id,
    language,
    headers,
  };
  const channelsData = await fetchBatchAuthenticate(
    fetchBatchAuthenticateParam
  );
  log('fetchBatchAuthenticate()', {
    channelsData,
    nextBatchChannels,
    fetchBatchAuthenticateParam,
  });

  // use fetched data to subscribe channels
  Object.keys(channelsData).forEach(channelName => {
    // save fetchedTimestamp for expiry check
    if (!scope.pusher.record[channelName]) {
      // not sure why sometimes the record is missing
      scope.pusher.record[channelName] = { retriedCount: 1 };
    }
    scope.pusher.record[channelName].fetchedTimestamp = Date.now();

    if (!pusherInstance.channels.channels[channelName]?.subscribed) {
      // only subscribe missing channels
      subscribeChannel({ channelName, pusherInstance });
      log('subscribe missing channel', {
        pusherInstance,
        channelName,
        data: channelsData[channelName],
      });
    } else {
      // send auth channel data to main thread
      pusherInstance.onAuth?.({
        channel: channelName,
        data: channelsData[channelName],
      });
      log('send auth channel data for stale channel', {
        pusherInstance,
        channelName,
        data: channelsData[channelName],
      });
    }
  });

  // reset subscribing flags
  const record = scope.pusher.record;
  nextBatchChannels.forEach(channel => {
    if (record[channel.channelName]) {
      record[channel.channelName].isSubscribing = false;
    }
  });
};

const getMergedPusherTargetsFromTabs = async ({
  workerMessenger,
  tabIds = [],
}) => {
  const log = debugLog.extend('getPusherTargetsFromTabs');
  const errorLog = debugErrorLog.extend('getPusherTargetsFromTabs');
  log('start', { workerMessenger, tabIds });

  try {
    const allSettledResults = await Promise.allSettled(
      tabIds.map(tabId => {
        return Promise.race([
          workerMessenger.get({
            selectPath: ['pusher', 'target'],
            options: { objectDepth: 2, tabId },
          }),
          new Promise((_, reject) => {
            // there will be extra tabIds on Safari which never resolves
            setTimeout(
              () => reject(new Error('get pusher target timeout')),
              100
            );
          }),
        ])
          .then(result => {
            log('get one tab target result', result, tabId);
            return result;
          })
          .catch(error => {
            errorLog('get one tab target error', error, tabId);
          });
      })
    );
    log('allSettled()', { allSettledResults });

    const targetsArray = allSettledResults
      .filter(result => result.status === 'fulfilled' && result.value)
      .map(result => result.value);
    log('fulfilled targets', { targetsArray });

    // merge pusher targets between browser tabs
    const mergedPusherTargets = targetsArray.reduce((acc, targets) => {
      Object.keys(targets).forEach(channelName => {
        const target = targets[channelName];

        // merge target.subscriptionMap into target.shouldSubscribe
        target.shouldSubscribe = Object.keys(target.subscriptionMap).some(
          ref => {
            return target.subscriptionMap[ref];
          }
        );
        delete target.subscriptionMap;

        if (!acc[channelName]) {
          acc[channelName] = target;
        } else {
          const current = acc[channelName];
          acc[channelName] = {
            channelName,
            isCritical: current.isCritical || target.isCritical,
            shouldSubscribe: current.shouldSubscribe || target.shouldSubscribe,
            timestamp: Math.min(current.timestamp, target.timestamp),
            expiryTimestamp: Math.max(
              current.expiryTimestamp,
              target.expiryTimestamp
            ),
          };
        }
      });
      return acc;
    }, {});
    log('merged', { mergedPusherTargets });

    return mergedPusherTargets;
  } catch (error) {
    errorLog('error', error);
  }
};

const getChannelDiffs = ({ targets = {}, pusherInstance }) => {
  const log = debugLog.extend('getChannelDiffs');
  const errorLog = debugErrorLog.extend('getChannelDiffs');

  if (!pusherInstance) {
    const error = new Error('missing pusherInstance');
    errorLog(error);
    throw error;
  }

  const current = pusherInstance.channels.channels;
  log('start', { targets, current });

  const combined = [
    ...new Set([...Object.keys(current), ...Object.keys(targets)]),
  ];
  const diffs = combined.reduce((acc, channelName) => {
    const shouldSubscribe = !!targets[channelName]?.shouldSubscribe;
    const isSubscribed = !!current[channelName]?.subscribed;
    if (isSubscribed !== shouldSubscribe) {
      acc[channelName] = shouldSubscribe
        ? targets[channelName]
        : { channelName, ...current[channelName], shouldSubscribe };
    }
    return acc;
  }, {});
  log('diffs', { diffs, combined });

  const extras = Object.keys(diffs)
    .filter(channelName => !diffs[channelName].shouldSubscribe)
    .map(channelName => diffs[channelName]);
  const missings = Object.keys(diffs)
    .filter(channelName => diffs[channelName].shouldSubscribe)
    .map(channelName => diffs[channelName]);
  log('result', { extras, missings });

  return { extras, missings };
};

const getStaleChannels = ({ targets = {}, record = {}, pusherInstance }) => {
  const log = debugLog.extend('getStaleChannels');
  const errorLog = debugErrorLog.extend('getChannelDiffs');

  if (!pusherInstance) {
    const error = new Error('missing pusherInstance');
    errorLog(error);
    throw error;
  }

  const current = pusherInstance.channels.channels;
  log('start', { targets, record, current });

  const subscribedChannelNames = Object.keys(current).filter(
    channelName => current[channelName].subscribed
  );
  const staleChannelNames = subscribedChannelNames.filter(channelName => {
    return (
      targets[channelName]?.shouldSubscribe &&
      targets[channelName].expiryTimestamp >
        record[channelName].fetchedTimestamp
    );
  });

  return staleChannelNames.map(channelName => targets[channelName]);
};

const FibonacciSequence = [1, 1, 2, 3, 5, 8, 13, 21, 34];
const getNextBatchChannels = ({
  record = {},
  missingChannels = [],
  staleChannels = [],
  now = Date.now(),
  batchSize = 10,
  criticalRetryUnitMsec = 1000, // TODO: remote config
  nonCriticalRetryUnitMsec = 5000, // TODO: remote config
}) => {
  const log = debugLog.extend('getNextBatchChannels');
  log('start', {
    record,
    missingChannels,
    now,
    batchSize,
    criticalRetryUnitMsec,
    nonCriticalRetryUnitMsec,
  });

  // compute next retry time for every channel
  const channelDataArray = missingChannels
    .filter(channel => !record[channel.channelName]?.isSubscribing)
    .map(channel => {
      const timestamp = channel.timestamp;
      const isCritical = channel.isCritical;
      const retriedCount = record[channel.channelName]?.retriedCount || 0;
      const retryUnit = isCritical
        ? criticalRetryUnitMsec
        : nonCriticalRetryUnitMsec;
      const nextRetryOffsetMsec = new Array(retriedCount)
        .fill()
        .map((_, index) => {
          return (
            retryUnit *
            (FibonacciSequence[index] ||
              FibonacciSequence[FibonacciSequence.length - 1])
          );
        })
        .reduce((acc, cur) => acc + cur, 0);
      const nextRetryTimestamp = timestamp + nextRetryOffsetMsec;
      return {
        ...channel,
        timestamp,
        retriedCount,
        isCritical,
        nextRetryTimestamp,
        nextRetryOffsetMsec,
      };
    });

  // filter channels which retry time past
  const sortedCriticalMissingChannelDataArray = channelDataArray
    .filter(channel => channel.isCritical)
    .filter(channel => channel.nextRetryTimestamp <= now)
    .sort((a, b) => a.nextRetryTimestamp - b.nextRetryTimestamp)
    .slice(0, batchSize);

  const sortedNoncriticalMissingChannelDataArray = channelDataArray
    .filter(channel => !channel.isCritical)
    .filter(channel => channel.nextRetryTimestamp <= now)
    .sort((a, b) => a.nextRetryTimestamp - b.nextRetryTimestamp)
    .slice(0, batchSize);

  const sortedCriticalStaleChannelDataArray = staleChannels
    .filter(channel => channel.isCritical)
    .sort((a, b) => a.expiryTimestamp - b.expiryTimestamp)
    .slice(0, batchSize);

  const sortedNonCriticalStaleChannelDataArray = staleChannels
    .filter(channel => !channel.isCritical)
    .sort((a, b) => a.expiryTimestamp - b.expiryTimestamp)
    .slice(0, batchSize);

  const batchedChannelDataArray = [
    ...sortedCriticalStaleChannelDataArray,
    ...sortedCriticalMissingChannelDataArray,
    ...sortedNonCriticalStaleChannelDataArray,
    ...sortedNoncriticalMissingChannelDataArray,
  ].slice(0, batchSize);

  // fill batch to batchSize with channels not meet retry time yet
  if (
    batchedChannelDataArray.length &&
    batchedChannelDataArray.length < batchSize
  ) {
    const stuffingChannelDataArray = channelDataArray
      .filter(channel => channel.nextRetryTimestamp > now)
      .sort((a, b) => a.nextRetryTimestamp - b.nextRetryTimestamp)
      .sort((a, b) => !!b.isCritical - !!a.isCritical)
      .slice(0, batchSize - batchedChannelDataArray.length);
    batchedChannelDataArray.push(...stuffingChannelDataArray);
  }

  log('result', { batchedChannelDataArray });
  return batchedChannelDataArray;
};

const authorizeBuffer = new LRUCache({ max: 64 });
const fetchBatchAuthenticate = async ({
  channels = [],
  endpoint,
  appId,
  socketId,
  language,
  headers = {},
}) => {
  const log = debugLog.extend('fetchBatchAuthenticate');
  const errorLog = debugErrorLog.extend('fetchBatchAuthenticate');

  log('start', { channels, endpoint, appId, socketId, language, headers });

  if (!channels?.length || !endpoint || !appId || !socketId) {
    errorLog('missing params', { channels, endpoint, appId, socketId });
    return [];
  }

  const url = new URL(endpoint);
  url.searchParams.set('app_id', appId);
  url.searchParams.set('socket_id', socketId);
  if (language) {
    url.searchParams.set('lang', language);
  }

  channels
    .map(channel => channel.channelName)
    .sort()
    .forEach(channel => {
      url.searchParams.append('channels', channel);
    });

  const options = {
    headers: {
      ...headers,
    },
  };

  log('fetch params', { href: url.href, options });
  try {
    const response = await fetch(url.href, options);
    const payload = await response.json();
    Object.keys(payload).forEach(channelName => {
      authorizeBuffer.set(`${socketId}-${channelName}`, payload[channelName]);
    });
    return payload;
  } catch (error) {
    errorLog('fetch error', { error });
    return {};
  }
};

/**
 * Use fetched pusher authorize data to do callback authorize
 * @param {string} {channelName} - pusher channel name
 * @param {string} {socketId} - pusher socket id
 * @param {Object} {[options={}]} - options
 * @param {function} {callback} - authorize function, arg1: false||authError, arg2: authData
 */
export const bufferedAuthorizer = ({
  channelName,
  socketId,
  options = {},
  callback,
}) => {
  const log = debugLog.extend('bufferedAuthorizer');
  const errorLog = debugErrorLog.extend('bufferedAuthorizer');
  log('start', { channelName, socketId, options });
  const cacheKey = `${socketId}-${channelName}`;
  const auth = authorizeBuffer.get(cacheKey);
  if (auth) {
    log('auth hit', { channelName, socketId, auth });
    addSecretKey({ channelName, secretKey: auth.shared_secret });
    callback(false, auth);
  } else {
    errorLog('missing auth', { channelName, socketId, options });
    callback(new Error('missing auth'));
  }
};
