import type { EntityAdapter, EntityState } from '@reduxjs/toolkit';
import type { EndpointBuilder, QueryDefinition } from '@reduxjs/toolkit/query';
import { logger, wsWaitForAllPages, type SubscriptionResponse } from '@talos/kyoko';
import type { Draft } from 'immer';
import { isObject } from 'lodash-es';
import { combineLatest, firstValueFrom, map, share, startWith, Subject } from 'rxjs';
import { v1 as uuid } from 'uuid';
import type { AppExtraArgument } from './types';

/**
 * A factory function that builds a query definition for a WebSocket endpoint, and wires up the necessary
 * logic to handle the subscription.
 * @param builder
 * @see https://redux-toolkit.js.org/rtk-query/usage/streaming-updates
 */
export function buildWsEndpointBuilder<Builder extends EndpointBuilder<any, any, any>>(
  builder: Builder,
  {
    pagingUniqueKeyCallback,
  }: {
    pagingUniqueKeyCallback?: (data: any) => string;
  } = {}
) {
  /**
   * Map of cache keys to promises that resolve when the initial data is received
   * This is used to ensure that the `isLoading` state is correct.
   *
   * If the `queryFn` resolve immediately, the `isLoading` state would be `true`, even though
   * we haven't received the initial WebSocket data yet.
   */
  const promises = new Map<
    CacheKey,
    {
      resolve: (value: QueryReturnValue<any, any, any>) => void;
      reject: (reason?: any) => void;
      promise: Promise<QueryReturnValue<any, any, any>>;
    }
  >();

  /**
   * Sometimes the `queryFn` is called first (for example on retries, or force fetches),
   * and sometimes the `onCacheEntryAdded` is called first (when the cache entry is added).
   *
   * This little utility help us cater to both cases.
   */
  function getOrCreatePromise(cacheKey: CacheKey) {
    if (promises.has(cacheKey)) {
      return promises.get(cacheKey)!;
    }
    const { promise, resolve, reject } = Promise.withResolvers<QueryReturnValue<any, any, any>>();
    promises.set(cacheKey, { promise, resolve, reject });
    return { promise, resolve, reject };
  }

  /**
   * Loosely resembles the default createApi#query function, but with a narrower API surface.
   *
   * @see https://redux-toolkit.js.org/rtk-query/api/createApi#query
   */
  function query<TResult, TQuery>(
    definition: WsQueryEndpointDefinition<TQuery, TResult>
  ): QueryDefinition<TQuery, any, any, TResult, ExtractReducerPath<Builder>> {
    const { buildRequest, serializeQueryArgs = defaultSerializeWsQueryArgs, ...args } = definition;

    return builder.query<TResult, TQuery>({
      /**
       * The queryFn controls the `isFetching` and `isLoading` state.
       * As we know, WebSocket subscriptions doesn't really operate on a request/response basis,
       * so we mimic that behavior by returning a promise that resolves when the initial data is received.
       */
      queryFn(args) {
        const cacheKey = serializeQueryArgs(args);
        const { promise } = getOrCreatePromise(cacheKey);
        return promise;
      },

      /**
       * This is called each time a new cache _key_ (not the value) is added to the cache.
       */
      async onCacheEntryAdded(args, api) {
        const { updateCachedData, cacheEntryRemoved } = api;
        const cacheKey = serializeQueryArgs(args);

        const promiseEntry = getOrCreatePromise(cacheKey);
        const firstDataLoaded = promiseEntry;

        // Set up WS subscription
        const address = uuid();
        const request = buildRequest(args);
        const packetCallback: Parameters<typeof wsClient.registerSubscription>[2] = (err: any, json) => {
          if (err != null) {
            firstDataLoaded.resolve({ error: 'error' in err ? err.error?.msg : 'An unknown error occurred.' });
            return;
          }

          if (json == null) {
            firstDataLoaded.resolve({ error: 'An unknown error occurred.' });
            logger.error(new Error('Received empty response from WebSocket'), { extra: { address, request } });
            return;
          }

          if (json.initial) {
            if ('entityAdapter' in definition) {
              firstDataLoaded.resolve({
                data: definition.entityAdapter.addMany(definition.entityAdapter.getInitialState(), json.data),
              });
            } else {
              firstDataLoaded.resolve({ data: definition.initializeCache(json.data) });
            }
            promises.delete(cacheKey);
          } else {
            updateCachedData(draft => {
              if ('entityAdapter' in definition) {
                definition.entityAdapter.upsertMany(draft as Draft<any>, json.data);
              } else {
                definition.updateCache(draft, json.data);
              }
            });
          }
        };

        const extra = api.extra as AppExtraArgument;
        const wsClient = extra.wsClient;

        // Create a subject to process callback data
        const callbackSubject = new Subject<{ err: any; json: any }>();

        // Share the original stream so both branches see the same items
        const shared$ = callbackSubject.pipe(share());

        // Branch 1: extract the json for wsWaitForAllPages
        const json$ = shared$.pipe(
          map(item => item.json),
          pagingUniqueKeyCallback
            ? wsWaitForAllPages({
                getUniqueKey: pagingUniqueKeyCallback,
              })
            : map(item => item)
        );

        // Branch 2: extract the error value, starting with null if none has come through
        const error$ = shared$.pipe(
          map(item => item.err),
          startWith(null)
        );

        // Combine the results so that the final output includes both
        const subjectSubscription = combineLatest([json$, error$])
          .pipe(map(([json, err]) => ({ err, json })))
          .subscribe(({ err, json }: { err: any; json: any }) => {
            packetCallback(err, json as SubscriptionResponse<unknown>);
          });

        if (!wsClient.isConnected) {
          // Wait until `onOpen` has emitted at least one value
          await firstValueFrom(wsClient.onOpen);
        }
        const unsubscribe = wsClient.registerSubscription(address, [request], (err: any, json: any) => {
          callbackSubject.next({ err, json });
        });

        try {
          await cacheEntryRemoved;
          promises.delete(cacheKey);
        } finally {
          unsubscribe();
          subjectSubscription.unsubscribe();
          callbackSubject.unsubscribe();
        }
      },
      serializeQueryArgs: ({ queryArgs }) => serializeQueryArgs(queryArgs),
      ...args,
    });
  }

  /**
   * The keen reader will note that there's no `mutation` function here.
   * We'll see if it makes sense to add in the future.
   */
  return { query };
}

/**
 * Deletes the tag from the query args, and serializes the rest.
 */
export function defaultSerializeWsQueryArgs(args: unknown) {
  if (isObject(args) && 'tag' in args) {
    const { tag, ...rest } = args;
    return JSON.stringify(rest);
  }
  return JSON.stringify(args);
}

// Public types
export type WsQueryEndpointBaseDefinition<TQuery> = {
  /** Construct the WebSocket request (subscription) stream object */
  buildRequest: (args: TQuery) => any;
  /** Serialize the query args to a cache key. */
  serializeQueryArgs?: (args: TQuery) => string;
  /** Optionally specify how long to keep unused data in the cache. */
  keepUnusedDataFor?: number;
};

export type WsQueryEndpointDefinition<TQuery, TResult> =
  /**
   * Provide an `entityAdapter` that magically updates the state
   * for you (assuming the data is an array of entities).
   */

  | (WsQueryEndpointBaseDefinition<TQuery> & {
      entityAdapter: RequireEntityAdapter<TResult>;
    })

  /** If you need more granular control over the cache initialization, use this. */
  | (WsQueryEndpointBaseDefinition<TQuery> & {
      /** Initialize the cache with the data received from the WebSocket. */
      initializeCache: <TData extends any[]>(data: TData) => TResult;
      /** Handle updates coming from the WebSocket. */
      updateCache: RecipeWithData<TResult>;
    });

// Private types - can probably be cleaned up.
type RequireEntityAdapter<T> = T extends EntityState<infer Entity, infer ID> ? EntityAdapter<Entity, ID> : never;
/**
 * Extract the reducer path from an endpoint builder.
 * We need this to ensure that the various utils have the right type.
 */

type ExtractReducerPath<Builder> = Builder extends EndpointBuilder<any, any, infer R> ? R : never;
type CacheKey = string | number | boolean | Record<any, any>;
type MaybeDrafted<T> = T | Draft<T>;
type RecipeWithData<TState, TData = any> = (draft: MaybeDrafted<TState>, data: TData[]) => void | MaybeDrafted<TState>;
export type QueryReturnValue<TData = unknown, TError = unknown, TMeta = unknown> =
  | {
      error: TError;
      data?: undefined;
      meta?: TMeta;
    }
  | {
      error?: undefined;
      data: TData;
      meta?: TMeta;
    };
