import { isFunction, isNil, values } from 'lodash-es';
import { BehaviorSubject, Subject, timer, type Observable, type Subscription as RxJsSubscription } from 'rxjs';
import { repeat, retry } from 'rxjs/operators';
import { webSocket, type WebSocketSubject } from 'rxjs/webSocket';
import { AVA_PING, AVA_PONG, CANCEL, PAGE, SUBSCRIBE, type WL_PING, type WL_PONG } from '../tokens';

import type { ErrorSubscriptionResponse } from '../types';
import type { RequestStream } from '../types/RequestStream';
import type { Response } from '../types/Response';
import { logger, type LogContext } from '../utils';
import { formattedDateForSubscription } from '../utils/date';
import { Subscription, type SubscriptionCallback } from './subscription';
import { WebsocketErrorCodeEnum } from './types';

const CLOSE_NORMAL = 1000;
const CLOSE_GOING_AWAY = 1001;
const CLOSE_NO_STATUS_RECEIVED = 1005;
const CLOSE_ABNORMAL = 1006;
const WEBSOCKET_CLOSE_CODES = {
  [CLOSE_NORMAL]: 'Normal Closure',
  [CLOSE_GOING_AWAY]: 'Going Away',
  1002: 'Protocol Error',
  1003: 'Unsupported Data',
  1004: '(For future)',
  [CLOSE_NO_STATUS_RECEIVED]: 'No Status Received',
  [CLOSE_ABNORMAL]: 'Abnormal Closure',
  1007: 'Invalid frame payload data',
  1008: 'Policy Violation',
  1009: 'Message too big',
  1010: 'Missing Extension',
  1011: 'Internal Error',
  1012: 'Service Restart',
  1013: 'Try Again Later',
  1014: 'Bad Gateway',
  1015: 'TLS Handshake',
} as const;

function getCloseCodeString(code: keyof typeof WEBSOCKET_CLOSE_CODES) {
  if (code >= 0 && code <= 999) {
    return '(Unused)';
  }
  if (code <= 1015) {
    return WEBSOCKET_CLOSE_CODES[code];
  }
  if (code <= 1999) {
    return '(For WebSocket standard)';
  }
  if (code <= 2999) {
    return '(For WebSocket extensions)';
  }
  if (code <= 3999) {
    return '(For libraries and frameworks)';
  }
  if (code <= 4999) {
    return '(For applications)';
  }
}

function getConnectionData() {
  const nav: any = navigator;
  if (!('connection' in nav)) {
    return {};
  }
  return {
    connectionType: nav.connection.type,
    rtt: nav.connection.rtt,
    effectiveType: nav.connection.effectiveType,
    type: nav.connection.type,
    downlink: nav.connection.downlink,
    saveData: nav.connection.saveData,
  };
}

const ErrorCodesCausingInactiveSubscription = new Set([
  WebsocketErrorCodeEnum.WebsocketErrorDuplicateReqID,
  WebsocketErrorCodeEnum.WebsocketErrorInvalidRequest,
  WebsocketErrorCodeEnum.WebsocketErrorInvalidStream,
]);

export interface IWebSocketClient<TMessageType, TRequest extends object = object> {
  isConnected: boolean;
  sessionId: string | null;
  performance: WebSocketPerformance | null;

  connect(url: string, options?: IWebSocketClientConnectOptions): void;
  closeAndReconnect(): void;
  /** Sends a message ("publication") and does not perform any subscription. Returns a cleanup function which should be called after some time to get rid of the onError handler. */
  registerPublication(request: { type: string; data: TRequest[] }, onError?: (err: any) => void): () => void;
  registerSubscription(
    address: string,
    streams: RequestStream[],
    callback: Subscription<TMessageType>['callback'],
    options?: { loadAll?: boolean; overrideParticipant?: string }
  ): () => void;
  unregisterSubscription(address: string): void;
  updateSubscription(address: string, streams: RequestStream[], callback?: unknown): void;
  pageSubscription(address: string, options?: { loadAll?: boolean }): void;
  sendMessage(message: object): void;
  sendTimedMessage(message: object): void;
  ping(data: object): void;
  pongs(): Observable<Response<{ ts: number }>>;

  /**
   * Notifies subscribers when the connection is opened (a "hello" event is received).
   */
  onOpen: Subject<string | null>;
  /**
   * Notifies subscribers when an error message is received.
   */
  onError: Subject<ErrorSubscriptionResponse>;
  /**
   * Notifies subscribers when the connection is closed.
   */
  onClose: Subject<CloseEvent>;
}

interface IWebSocketClientConnectOptions {
  initialRetryInterval?: number;
  maxRetryInterval?: number;
  maxRetryAttempts?: number;
}

type PingPongTypes = { ping: typeof AVA_PING | typeof WL_PING; pong: typeof AVA_PONG | typeof WL_PONG };

/** Generic client for WebSocket connections throughout kyoko-based apps
 *
 * TODO: WebsocketClient needs a lot of typing improvement, but it was the one remaining place in kyoko that needed it */
export class WebSocketClient<TMessageType = unknown> implements IWebSocketClient<TMessageType> {
  public sessionId: string | null;
  public isConnected = false;
  public performance: WebSocketPerformance | null;

  private url?: string;
  private subscriptionsByAddress: Map<string, Subscription<TMessageType>>;
  private subscriptionsByReqID: Map<number, Subscription<TMessageType>>;
  private canceledSubscriptions: Map<number | string, Subscription<TMessageType>>;
  private requestCount = 1;
  private socket?: WebSocketSubject<Response<TMessageType>>;
  public readonly onOpen: Subject<string | null>;
  public readonly onError: Subject<ErrorSubscriptionResponse>;
  public readonly onClose: Subject<CloseEvent>;
  private socketSubscription?: RxJsSubscription;
  private pongSubject: Subject<Response<{ ts: number }>>;
  private pongObservable: Observable<Response<{ ts: number }>>;
  // Customer Api and principal api have different ping/pong types.
  private pingPongTypes: PingPongTypes;

  /**
   * For each registerPublication invocation, we add the reqid we sent out here to keep track.
   * The value of the map is the onError callback. If it wasn't provided, the value will be null.
   */
  private publicationsByReqID: Map<number, ((err: unknown) => void) | null>;

  constructor({
    pingPongTypes = { ping: AVA_PING, pong: AVA_PONG },
  }: {
    pingPongTypes?: PingPongTypes;
  } = {}) {
    logger.info('[ws] Creating new WebSocketClient');
    this.subscriptionsByAddress = new Map();
    this.subscriptionsByReqID = new Map();
    this.canceledSubscriptions = new Map();
    this.sessionId = null;
    this.pingPongTypes = pingPongTypes;

    this.onOpen = new Subject();
    this.onError = new Subject();
    this.onClose = new Subject();
    this.performance = null;

    this.pongSubject = new Subject();
    this.pongObservable = this.pongSubject.asObservable();

    this.publicationsByReqID = new Map();
  }

  connect(
    url: string,
    {
      initialRetryInterval = 1000,
      maxRetryInterval = 15000,
      maxRetryAttempts = Infinity,
    }: IWebSocketClientConnectOptions = {}
  ) {
    logger.info('[ws] Creating new WebSocket', { extra: { url } });
    this.url = url;
    this.socket = webSocket({
      url,
      openObserver: {
        next: () => {
          logger.info('[ws] WebSocket connected');
        },
      },
      closeObserver: {
        next: e => {
          logger.info(`[ws] WebSocket closed`, {
            extra: {
              code: e.code,
              message: getCloseCodeString(e.code as keyof typeof WEBSOCKET_CLOSE_CODES),
              reason: e.reason,
              wasClean: e.wasClean,
              sessionId: this.sessionId,
              connection: getConnectionData(),
            },
          });

          this.isConnected = false;
          this.onClose.next(e);
        },
      },
    });

    this.socketSubscription = this.socket
      .pipe(
        // If there was an error starting, maintaining or closing the connection,
        // an `error` is emitted, in which case we `retry`.
        retry({
          count: maxRetryAttempts,
          delay: (_, numRetries) => timer(Math.min(maxRetryInterval, initialRetryInterval * Math.pow(2, numRetries))),
        }),

        // If the connection closes without any issues, a `complete` is emitted,
        // in which case we `repeat`.
        repeat<any>({
          count: maxRetryAttempts,
          delay: numRetries => timer(Math.min(maxRetryInterval, initialRetryInterval * Math.pow(2, numRetries))),
        })
      )

      // Note that all we need to "force close" the connection is to unsubscribe from the observable.
      .subscribe({
        next: json => this.handleMessage(json),
        error: error => this.handleError(error),
      });
  }

  closeAndReconnect(
    connectArgs: { initialRetryInterval?: number; maxRetryInterval?: number; maxRetryAttempts?: number } = {}
  ) {
    logger.info('[ws] Closing and attempting reconnect');
    this.close();
    if (this.url == null) {
      throw new Error('Socket is not initialized');
    }
    this.connect(this.url, connectArgs);
  }

  close() {
    // Unsubscribe from the socket observable to close the connection.
    if (this.socketSubscription != null) {
      this.socketSubscription.unsubscribe();
    }
  }

  pongs() {
    return this.pongObservable;
  }

  registerPublication(request: object, onError?: (err: any) => void) {
    const reqid = this.requestCount++;
    this.publicationsByReqID.set(reqid, onError ?? null);
    this.sendMessage({ reqid, ...request });
    return () => {
      this && this.publicationsByReqID.delete(reqid);
    };
  }

  registerSubscription(
    address: string,
    streams: RequestStream[],
    callback: SubscriptionCallback<TMessageType>,
    options?: {
      loadAll?: boolean;
      overrideParticipant?: string;
    }
  ) {
    if (this.subscriptionsByAddress.has(address)) {
      this.logError(`Subscription address ${address} is already in use`);
    }
    performance.mark(`${address}-start`);
    const reqid = this.requestCount++;
    const overrideParticipant = options?.overrideParticipant;
    const sub = new Subscription<TMessageType>({
      address,
      streams,
      reqid,
      callback,
      loadAll: options?.loadAll,
      overrideParticipant,
    });
    this.subscriptionsByAddress.set(address, sub);
    this.subscriptionsByReqID.set(reqid, sub);

    /**
     * Tiny change to make sure we don't subscribe prematurely.
     * Just for this demo (but honestly we should probably just do this anyway, it's a better API)
     */
    this.sendTimedMessage({
      type: SUBSCRIBE,
      reqid,
      streams,
      overrideParticipant,
    });
    this.performance?.registerSubscription(reqid, streams);

    /** For convenience */
    return () => {
      this.unregisterSubscription(address);
    };
  }

  unregisterSubscription(address: string) {
    const sub = this.subscriptionsByAddress.get(address);
    if (sub == null) {
      this.logError(`Subscription not found while trying to unregister subscription on address ${address}`);
      return;
    }

    const reqid = sub.reqid;
    // Note: we always send a `cancel` message, even if the sub is *believed* to be inactive
    // We just ignore `unknown req id` errors for any subs in our canceled subscriptions list
    // This way, we can be sure to cancel subscriptions that are still active on the server.
    this.sendTimedMessage({
      type: CANCEL,
      tag: sub.streams[0]['Tag'],
      reqid,
    });

    // Dispose to avoid memory leak
    sub.dispose();

    // add to the canceled subs map, by both reqid and address
    this.canceledSubscriptions.set(reqid, sub);
    this.canceledSubscriptions.set(address, sub);
    this.subscriptionsByAddress.delete(address);
    this.subscriptionsByReqID.delete(reqid);
    this.performance?.unregisterSubscription(reqid);
  }

  updateSubscription(address: string, streams: RequestStream[], callback?: unknown) {
    const sub = this.subscriptionsByAddress.get(address);
    if (sub == null) {
      this.logError(`Subscription not found while trying to update subscription on address ${address}`);
      return;
    }

    // Since we can never quite be sure if an error response means that a subscription is active or not,
    // We amend subscriptions by cancelling the old one and sending a new subscribe request.
    // This also prevents us sending an `amend` that fails, and leaves the server with the old subscription
    // while the client side has the updated subscription details.
    // Instead, we'd now have one cancelled subscription, and one in-active subscription, which is consistent
    // with the server state.
    this.unregisterSubscription(address);
    return this.registerSubscription(address, streams, isFunction(callback) ? callback : sub?.callback, {
      loadAll: sub?.loadAll,
    });
  }

  pageSubscription(address: string, { loadAll = false } = {}) {
    const sub = this.subscriptionsByAddress.get(address);
    if (sub == null) {
      if (!this.canceledSubscriptions.has(address)) {
        this.logError(`Subscription not found while trying to page on address ${address}`);
      }
      return;
    }

    const { reqid, streams, next } = sub;
    if (streams.length > 1) {
      throw new Error('Paging subscriptions with multiple streams is not supported');
    }
    const stream = streams[0];
    sub.loadAll = loadAll;
    if (next != null) {
      return this.sendTimedMessage({
        type: PAGE,
        reqid,
        streams: [{ name: stream.name, after: next }],
      });
    }
  }

  sendMessage(message: any) {
    if (this.socket == null) {
      throw new Error('Socket is not initialized');
    }
    this.socket.next(message);
  }

  sendTimedMessage(message: any) {
    return this.sendMessage({
      ...message,
      ts: formattedDateForSubscription(new Date()),
    });
  }

  ping(data: any) {
    if (this.isConnected) {
      return this.sendTimedMessage({
        type: this.pingPongTypes.ping,
        reqid: this.requestCount++,
        data: [data],
      });
    }
  }

  pong(reqid: string, data: any) {
    return this.sendTimedMessage({
      type: this.pingPongTypes.pong,
      reqid,
      data,
    });
  }

  private handlePublicationMessage(json: any) {
    // It should always be an error, but just making it explicit...
    if (json.type === 'error') {
      this.logError(JSON.stringify(json.error), { message: json });

      const publicationErrorCallback = json.reqid ? this.publicationsByReqID.get(json.reqid) : undefined;
      if (publicationErrorCallback) {
        publicationErrorCallback(json);
      } else {
        logger.error(new Error('Unhandled websocket publication error'), { extra: json });
      }
    }
  }

  handleMessage(json: any) {
    const sub = this.subscriptionsByReqID.get(json.reqid);
    const reqSub = sub ?? this.canceledSubscriptions.get(json.reqid);

    // not all received messages will be related to a subscription. Some will be related to publications. These are one-way ui-to-backend messages
    // which can still receive responses (which should only happen in the case of an error). If we hit a publication message here, lets branch off into a different handler and exit.
    const isResponseToPublication = json.reqid ? this.publicationsByReqID.has(json.reqid) : undefined;
    if (isResponseToPublication) {
      this.handlePublicationMessage(json);
      return;
    }

    if (json.type === 'hello') {
      this.sessionId = json.session_id;
      if (this.performance) {
        this.performance.sessionId = this.sessionId;
      }
      this.isConnected = true;

      // Collect any active subscriptions
      const activeSubscriptions = [...this.subscriptionsByAddress.entries()];

      // Re-subscribe to any active subscriptions
      for (const [address, subscription] of activeSubscriptions) {
        this.updateSubscription(address, subscription.streams, subscription.callback);
      }

      logger.info('[ws] WebSocket handshake complete', { extra: { sessionId: this.sessionId } });

      return this.onOpen.next(this.sessionId);
    }

    if (json.type === this.pingPongTypes.ping) {
      return this.pong(json.reqid, json.data);
    }

    if (json.type === this.pingPongTypes.pong) {
      if (this.performance) {
        // clock skew estimate:
        // pingPayload - serverTS - (now - pingPayload) / 2
        // this is just an estimate as it does not account for processing time on the server. We assume for simple messages
        // this is much lower than our network latency.
        const pingPayload = new Date(json.data[0].ts).getTime();
        const skew = pingPayload - new Date(json.ts).getTime() - (new Date().getTime() - pingPayload) / 2;
        this.performance.clockSkew = skew;
      }
      this.pongSubject.next(json);
      return;
    }

    if (this.performance) {
      this.performance.onMessage(json);
    }

    // Note: we only ignore `UnknownReqID` errors here if the subscription is in our cancelled list
    // That usually indicates that we tried to cancel a subscription that never got setup correctly
    // (e.g. invalid stream, etc.).
    // If the reqid is not in our list of cancelled subscriptions, this is definitely still an error worth reporting

    const isIgnoredErrorCode = [
      WebsocketErrorCodeEnum.WebsocketErrorUnknownReqID,
      WebsocketErrorCodeEnum.WebsocketErrorSubscriptionLimit,
      WebsocketErrorCodeEnum.WebsocketErrorPermissionDenied,
    ].includes(json?.error?.code);
    const isCancelledSub = this.canceledSubscriptions.has(json.reqid);

    const shouldLogError = json.type === 'error' && !isIgnoredErrorCode && !isCancelledSub;

    if (shouldLogError) {
      this.logError(JSON.stringify(json.error), {
        subscription: {
          address: reqSub?.address,
          reqid: reqSub?.reqid,
          streams: reqSub?.streams,
          active: reqSub?.active,
          next: reqSub?.next,
          tag: reqSub?.streams?.map(s => s.tag).join(', '),
          cancelled: isNil(sub) && !isNil(reqSub),
        },
      });

      // mark this sub as inactive, so we won't try to unsubscribe later
      if (sub && ErrorCodesCausingInactiveSubscription.has(json.error.code)) {
        sub.active = false;
      }

      if (json.error.code === WebsocketErrorCodeEnum.WebsocketErrorInternalServerError) {
        this.onError.next(json);
      }
    }

    if (json.type === 'error') {
      const callback = sub?.callback;
      if (callback != null) {
        return callback(json);
      }
    }

    if (isNil(sub) && isCancelledSub) {
      // Sometimes the backend manages to send out messages right before
      // our `cancel` message has reached it
      return;
    }

    if (isNil(sub)) {
      this.logError('Subscription not found for request id', {
        level: 'info',
        extra: { json },
      });
    } else {
      sub.active = true; // in case it was inactive previously
      const { loadMore } = sub.handleJson(json);
      if (loadMore) {
        this.pageSubscription(sub.address, { loadAll: true });
      } else {
        if (json.initial) {
          performance.mark(`${sub.address}-end`);
          performance.measure(`sub-${json.type}-${json.tag}`, `${sub.address}-start`, `${sub.address}-end`);
        }
      }
    }
  }

  handleError(error: any) {
    console.warn('websocket error:', error);
  }

  logError = (message: string, context?: LogContext) => {
    return logger.error(new Error(message), {
      websocket: {
        url: this.url,
        sessionId: this.sessionId,
      },
      ...context,
    });
  };
}

declare global {
  // typing for https://developer.mozilla.org/en-US/docs/Web/API/NetworkInformation
  // - now in TS main due to limited browser support (was added then removed)
  interface NetworkInformation {
    type?: string;
    downlink?: number;
    effectiveType?: string;
    rtt?: number;
  }
}

interface SubscriptionInfo {
  /** The first of the requestStreams, primarily used for typing to show that we always have at least one stream for this object to exist. */
  primaryStream: RequestStream;

  /** The raw request stream objects, these won't show up properly in the console table, but we can expand the object underneath the table to drill into these if needed. */
  requestStreams: RequestStream[];

  /** Total number of messages received for this subscription. */
  totalMessages: number;

  /** Data records received from this subscription if this stream is listed in the captureStreams local storage config. */
  capturedMessages: unknown[];

  /** Total number of data records received for this subscription.  */
  totalRecords: number;

  /** Number of data records received since the last timed reset (every second). */
  recordsSinceLastSecond: number;

  /** Number of data records received in the previous second. Consider changing this to a moving average. */
  recordsInLastSecond: number;

  /** Time when the subscription was first subscribed to. */
  subscribedAt: Date;

  /** Time taken for the subscription to either receive it first message or when it has loaded it's last page of data. */
  loadTimeMs?: number;

  /** Time when the subscription was unsubscribed from. */
  unsubscribedAt?: Date;
}

type SubscriptionInfos = { [reqid: string]: SubscriptionInfo };

export interface SubscriptionStats {
  reqId: string;
  sub: string;
  tag: string;
  totalMessages: number;
  totalRecords: number;
  recordsLastSecond: number;
  loadTimeMs?: number;
  subscribedTime: string;
  unsubscribedTime?: string;
  requestStreams: RequestStream[];
  isCapturedStream: boolean;
  capturedMessages: unknown[];
}
const CAPTURED_STREAMS_KEY = 'stats.capturedStreams';
const LOG_SUBSCRIPTION_INFO_KEY = 'stats.logSubscriptionInfo';
const LOG_SUBSCRIPTION_INFO_TIME_LIMIT_KEY = 'stats.logSubscriptionInfoTimeLimit';

export class WebSocketPerformance {
  sessionId: string | null = null;
  interval: ReturnType<typeof setInterval> | null = null;
  subject = new Subject<{
    messagesPerSecond: number;
    latencyPerSecond: number;
    activeSubscriptions: number;
    activeSubscriptionStats: SubscriptionStats[];
    canceledSubscriptions: number;
    canceledSubscriptionStats: SubscriptionStats[];
  }>();

  // estimate of the difference between the client and server clocks. This number (ms) should be subtracted to any
  // timestamp received from the server if comparing it to a local timestamp.
  clockSkew = 0;

  // Only send latency warnings once every 60s
  didSendLatencyWarning = false;
  latencySinceLastReset = 0;
  messagesSinceLastReset = 0;
  latencyPerSecond = 0;
  messagesPerSecond = 0;
  started = false;

  private subscriptionInfos: SubscriptionInfos = {};

  private capturedStreams: Set<string>;

  activeSubscriptions = 0;
  canceledSubscriptions = 0;

  isLogSubscriptionInfo$ = new BehaviorSubject<boolean>(this.isLogSubscriptionInfo());

  constructor() {
    this.sessionId = null;
    const capturedStreams = localStorage.getItem(CAPTURED_STREAMS_KEY);
    this.capturedStreams = new Set(capturedStreams?.split(','));
  }

  reset() {
    this.messagesSinceLastReset = 0;
    this.latencySinceLastReset = 0;
  }

  stats() {
    return this.subject;
  }

  registerSubscription(reqid: number, streams: RequestStream[]) {
    this.activeSubscriptions++;

    if (this.isLogSubscriptionInfo()) {
      if (streams.length > 0) {
        this.subscriptionInfos[reqid] = {
          primaryStream: streams[0],
          requestStreams: streams,
          totalMessages: 0,
          capturedMessages: [],
          totalRecords: 0,
          recordsInLastSecond: 0,
          recordsSinceLastSecond: 0,
          subscribedAt: new Date(),
        };
      }
    }
  }

  onMessage(json: any) {
    const { reqid, next, ts } = json;
    if (ts) {
      // We need to subtract the clock skew from the server timestamp to get the correct latency
      const latency = new Date().getTime() - Date.parse(ts) - this.clockSkew;
      this.latencySinceLastReset += latency;
    }
    this.messagesSinceLastReset += 1;

    const subscriptionInfo = this.subscriptionInfos[reqid];
    // this will be undefined if isLogSubscriptionInfo() is false
    if (subscriptionInfo) {
      const recordsLength = json.data?.length ?? 0;
      subscriptionInfo.totalMessages++;
      const streamName = subscriptionInfo.primaryStream.name;
      if (this.isCapturedStream(streamName)) {
        subscriptionInfo.capturedMessages.push(json);
      }
      subscriptionInfo.totalRecords += recordsLength;
      subscriptionInfo.recordsSinceLastSecond += recordsLength;
      if (!next && !subscriptionInfo.loadTimeMs) {
        subscriptionInfo.loadTimeMs = new Date().getTime() - subscriptionInfo.subscribedAt.getTime();
      }
    }
  }

  unregisterSubscription(reqid: number) {
    this.activeSubscriptions--;
    this.canceledSubscriptions++;

    const subscription = this.subscriptionInfos[reqid];
    if (subscription) {
      subscription.unsubscribedAt = new Date();
    }
  }

  private getSubscriptionStats(unsubscribed: boolean): SubscriptionStats[] {
    if (!this.isLogSubscriptionInfo()) {
      return [];
    }
    const filteredSubscriptions = Object.entries(this.subscriptionInfos).filter(([, s]) =>
      unsubscribed ? !!s.unsubscribedAt : !s.unsubscribedAt
    );

    return filteredSubscriptions.map(([reqid, info]) => {
      return {
        reqId: reqid,
        sub: info.primaryStream.name,
        tag: info.primaryStream.tag,
        totalMessages: info.totalMessages,
        totalRecords: info.totalRecords,
        recordsLastSecond: info.recordsInLastSecond,
        loadTimeMs: info.loadTimeMs,
        subscribedTime: info.subscribedAt?.toLocaleTimeString(),
        unsubscribedTime: info.unsubscribedAt?.toLocaleTimeString(),
        requestStreams: info.requestStreams,
        isCapturedStream: this.isCapturedStream(info.primaryStream.name),
        capturedMessages: info.capturedMessages,
      };
    });
  }

  start() {
    this.interval = setInterval(() => {
      this.messagesPerSecond = this.messagesSinceLastReset;

      if (this.isLogSubscriptionInfoTimeLimitExceeded()) {
        this.setLogSubscriptionInfo(false);
      }
      if (this.isLogSubscriptionInfo()) {
        values(this.subscriptionInfos).forEach(info => {
          info.recordsInLastSecond = info.recordsSinceLastSecond;
          info.recordsSinceLastSecond = 0;
        });
      }

      if (this.messagesSinceLastReset > 0) {
        this.latencyPerSecond = Math.round(this.latencySinceLastReset / this.messagesSinceLastReset);
      } else {
        this.latencyPerSecond = 0;
      }

      this.subject.next({
        messagesPerSecond: this.messagesPerSecond,
        latencyPerSecond: this.latencyPerSecond,
        activeSubscriptions: this.activeSubscriptions,
        activeSubscriptionStats: this.getSubscriptionStats(false),
        canceledSubscriptions: this.canceledSubscriptions,
        canceledSubscriptionStats: this.getSubscriptionStats(true),
      });

      if (this.latencyPerSecond > 0) {
        logger.trackDuration('responseLatencyAvg', {
          startTime: new Date().getTime(),
          duration: this.latencyPerSecond,
          context: {
            activeSubscriptions: this.activeSubscriptions,
            messagesPerSecond: this.messagesPerSecond,
          },
        });
      }

      this.reset();
    }, 1000);
  }

  stop() {
    this.started = false;
    if (this.interval != null) {
      clearInterval(this.interval);
    }
  }

  isLogSubscriptionInfo() {
    // only log subscription info when feature flag is set as subscription info will grow indefinitely over time.
    return localStorage.getItem(LOG_SUBSCRIPTION_INFO_KEY) === 'true';
  }

  isLogSubscriptionInfoTimeLimitExceeded() {
    // we want to do this in case someone has accidentally left subscription logging on, which will eventually cause an OOM.
    const timeLimit = localStorage.getItem(LOG_SUBSCRIPTION_INFO_TIME_LIMIT_KEY);
    if (!timeLimit) {
      return false;
    }
    return new Date(timeLimit) < new Date();
  }

  setLogSubscriptionInfo(enabled: boolean) {
    if (!enabled) {
      this.subscriptionInfos = {};
      localStorage.removeItem(LOG_SUBSCRIPTION_INFO_KEY);
      this.capturedStreams.clear();
      localStorage.removeItem(CAPTURED_STREAMS_KEY);
      localStorage.removeItem(LOG_SUBSCRIPTION_INFO_TIME_LIMIT_KEY);
    } else {
      localStorage.setItem(LOG_SUBSCRIPTION_INFO_KEY, 'true');
      // save UTC date one hour from now to localStorage to automatically stop logging subscription info after an hour.
      const stopTime = new Date();
      stopTime.setHours(stopTime.getHours() + 1);
      localStorage.setItem(LOG_SUBSCRIPTION_INFO_TIME_LIMIT_KEY, stopTime.toISOString());
    }
    this.isLogSubscriptionInfo$.next(enabled);
  }

  isCapturedStream(stream?: string) {
    return stream ? this.capturedStreams.has(stream) ?? false : false;
  }

  setCapturedStream(stream: string, enabled: boolean) {
    if (enabled) {
      this.capturedStreams.add(stream);
    } else {
      this.capturedStreams.delete(stream);
    }
    localStorage.setItem(CAPTURED_STREAMS_KEY, [...this.capturedStreams].join(','));
  }
}
