import { isEqual } from 'lodash-es';
import { filter, map, pipe, scan } from 'rxjs';
import type { MinimalSubscriptionResponse } from '../types/SubscriptionResponse';
import { jsonDefined } from './utils';
import type { WSSubscriptionMemoryOutput } from './wsSubscriptionMemory';

interface WSPickFromMemoryStreamParams<T, R> {
  pick: (item: T) => R;
  getUniqueKey: (item: T) => string;
}

export function wsPickFromMemoryStream<T, R>({ pick, getUniqueKey }: WSPickFromMemoryStreamParams<T, R>) {
  return pipe(
    scan(
      ({ changed, latestPickedMessage }, { diff, latestMessage }: WSSubscriptionMemoryOutput<T>) => {
        changed = false;

        if (latestMessage.initial) {
          changed = true;
        }

        const changedData: R[] = [];
        for (const update of latestMessage.data) {
          const pickedUpdate = pick(update);

          // If no diff exists, there are no entries to compare this update to, regard it to be changed.
          const key = getUniqueKey(update);
          const diffEntry = diff?.get(key);

          if (diffEntry == null || diffEntry.prev == null) {
            changedData.push(pickedUpdate);
            changed = true;
            continue;
          }

          // Here, diffEntry.prev is the previous version of our entry. We compare it to the pickedUpdate.
          const pickedDiffEntry = pick(diffEntry.prev);
          if (!isEqual(pickedDiffEntry, pickedUpdate)) {
            changedData.push(pickedUpdate);
            changed = true;
            continue;
          }
        }

        if (changed) {
          latestPickedMessage = { ...latestMessage, data: changedData };
        }

        return { changed, latestPickedMessage };
      },
      {
        changed: false,
        latestPickedMessage: undefined as MinimalSubscriptionResponse<R> | undefined,
      }
    ),
    filter(({ changed }) => changed), // only forward if the scan operator tells us that something has changed on this emit
    map(({ latestPickedMessage }) => latestPickedMessage), // we only want to forward the json message
    filter(jsonDefined) // only forward it if its defined, and this constraints the type to not be | undefined
  );
}
