import { Observable, type OperatorFunction } from 'rxjs';
import { UpdateActionEnum } from '../types';
import type { MinimalSubscriptionResponse } from '../types/SubscriptionResponse';

interface wsStitchWithParams<PrimaryType, SecondaryType, OutputType> {
  getPrimaryTypeKey: (item: PrimaryType) => string;
  getSecondaryTypeKey: (item: SecondaryType) => string;
  secondarySource: Observable<MinimalSubscriptionResponse<SecondaryType>>;
  stitch: (primaryItem: PrimaryType, secondaryItem: SecondaryType | undefined) => OutputType;
}

/**
 * `wsStitchWith` is used to stitch data from two sources together.
 *
 * The primary source (the observable we are piped off of) is the data source determining the output records.
 * The secondary source data will only ever enrich primary source records.
 * This pipe stitches, as opposed to joins, data. Stitching means that records are stitched together 1:1, meaning that the primary and secondary keys
 * need to be unique within their streams respectively.
 *
 * Not supported: `UpdateAction.Delete` messages. This pipe does not work with streams which rely on the frontend understanding deletion messages.
 *
 * Future work:
 * - Extend the pipe to allow an array of secondarySources
 */
export function wsStitchWith<PrimaryType, SecondaryType, OutputType>({
  getPrimaryTypeKey,
  getSecondaryTypeKey,
  secondarySource,
  stitch,
}: wsStitchWithParams<PrimaryType, SecondaryType, OutputType>): OperatorFunction<
  MinimalSubscriptionResponse<PrimaryType>,
  MinimalSubscriptionResponse<OutputType>
> {
  return primarySource => {
    return new Observable<MinimalSubscriptionResponse<OutputType>>(output => {
      const primaryEntities = new Map<string, PrimaryType>();
      let secondaryEntities = new Map<string, SecondaryType>();

      const primarySub = primarySource.subscribe(message => {
        const outputData: OutputType[] = [];

        if (message.initial) {
          primaryEntities.clear();
        }

        for (const update of message.data) {
          const key = getPrimaryTypeKey(update);

          // @ts-expect-error - needs stronger typing clarity - UpdateAction not tied to PrimaryType's base
          const maybeUpdateAction: UpdateActionEnum | undefined = update['UpdateAction'];
          const isDeletion = maybeUpdateAction === UpdateActionEnum.Remove;

          if (isDeletion) {
            // Delete the previously stored entity from the map, but still forward the Deletion message downstream
            primaryEntities.delete(key);
            outputData.push(stitch(update, secondaryEntities.get(key)));
          } else {
            // Normal Update
            primaryEntities.set(key, update);
            outputData.push(stitch(update, secondaryEntities.get(key)));
          }
        }

        output.next({
          initial: message.initial,
          data: outputData,
        });
      });

      const secondarySub = secondarySource.subscribe(message => {
        const outputData: OutputType[] = [];

        if (message.initial) {
          // If we receive an initial: true message on the secondary stream, we need to remove all stitching we have done so to speak and re-apply new ones.
          // For each stitch we have done previously, we'll have an entry currently in secondaryEntities map. Using that, we can resolve which
          // primaryEntities we need to send un-stitched updates for.
          const newSecondaryEntities = new Map<string, SecondaryType>(
            message.data.map(update => [getSecondaryTypeKey(update), update])
          );

          // Create a unique list of all the relevant keys. The previously-stitched secondary entities and the new ones.
          const allSecondaryEntityKeys = new Set([...secondaryEntities.keys(), ...newSecondaryEntities.keys()]);
          for (const key of allSecondaryEntityKeys) {
            const primaryEntity = primaryEntities.get(key);
            if (primaryEntity) {
              // We'll either stitch with the new secondary entity or undefined.
              outputData.push(stitch(primaryEntity, newSecondaryEntities.get(key)));
            }
          }

          // Overwrite the old (now outdated) secondary entities with the new ones.
          secondaryEntities = newSecondaryEntities;
        } else {
          // Message not initial. What we do here is we just for each secondary entity see what primary entities we need to send updates for.
          for (const update of message.data) {
            const key = getSecondaryTypeKey(update);
            // @ts-expect-error - needs stronger typing clarity - UpdateAction is not tied to SecondaryType's base
            const maybeUpdateAction: UpdateActionEnum | undefined = update['UpdateAction'];
            const isDeletion = maybeUpdateAction === UpdateActionEnum.Remove;

            if (isDeletion) {
              secondaryEntities.delete(key);
              const primaryEntity = primaryEntities.get(key);
              if (primaryEntity) {
                outputData.push(stitch(primaryEntity, undefined)); // send the un-stitched entity out
              }
            } else {
              // Is a normal update operation
              secondaryEntities.set(key, update);
              const primaryEntity = primaryEntities.get(key);
              if (primaryEntity) {
                outputData.push(stitch(primaryEntity, update));
              }
            }
          }
        }

        if (outputData.length > 0) {
          output.next({
            initial: false,
            data: outputData,
          });
        }
      });

      return () => {
        primarySub.unsubscribe();
        secondarySub.unsubscribe();
      };
    });
  };
}
