import { useRef } from "react"
import type {
  QueryFunction,
  QueryFunctionContext,
  QueryKey,
} from "@tanstack/react-query"
import { hashKey, useQueryClient } from "@tanstack/react-query"
import { firstValueFrom, Observable, of } from "rxjs"
import { catchError, finalize, share, skip, tap } from "rxjs/operators"
import { cleanupSubscription, storeSubscription } from "./storage"

type TPageParam = string | number

export interface UseObservableQueryFnResult<
  TSubscriptionFnData = unknown,
  TSubscriptionKey extends QueryKey = QueryKey,
> {
  queryFn: QueryFunction<TSubscriptionFnData, TSubscriptionKey, TPageParam>
  clearErrors: () => void
}

export function useObservableQueryFn<
  TSubscriptionFnData = unknown,
  TCacheData = TSubscriptionFnData,
  TSubscriptionKey extends QueryKey = QueryKey,
>(
  subscriptionFn: (
    context: QueryFunctionContext<TSubscriptionKey, TPageParam>
  ) => Observable<TSubscriptionFnData>,
  dataUpdater: (
    data: TSubscriptionFnData,
    previousData: unknown,
    pageParam: TPageParam
  ) => TCacheData
): UseObservableQueryFnResult<TSubscriptionFnData, TSubscriptionKey> {
  const queryClient = useQueryClient()

  // We cannot assume that this fn runs for this component.
  // It might be a different observer associated to the same query key.
  const failRefetchWith = useRef<false | Error>(false)

  const queryFn: QueryFunction<
    TSubscriptionFnData,
    TSubscriptionKey,
    TPageParam
  > = (context) => {
    const { queryKey: subscriptionKey, pageParam, signal } = context
    const hashedSubscriptionKey = hashKey(subscriptionKey)

    if (failRefetchWith.current) {
      throw failRefetchWith.current
    }

    type Result = Promise<TSubscriptionFnData> & { cancel?: () => void }

    const stream$ = subscriptionFn(context).pipe(share())
    const result: Result = firstValueFrom(stream$).catch((err) => {
      throw err // Ensure React Query sees it
    })

    // Fixes scenario when component unmounts before first emit.
    // If we do not invalidate the query, the hook will never re-subscribe,
    // as data are otherwise marked as fresh.
    const cancel = () => {
      void queryClient.invalidateQueries(
        { queryKey: subscriptionKey },
        { cancelRefetch: false }
      )
    }

    // `signal` is available on context from ReactQuery 3.30.0
    // If `AbortController` is not available in the current runtime environment
    // ReactQuery sets `signal` to `undefined`. In that case we fallback to
    // old API, attaching `cancel` fn on promise.
    // @see https://tanstack.com/query/v4/docs/guides/query-cancellation
    if (signal) {
      signal.addEventListener("abort", cancel)
    } else {
      result.cancel = cancel
    }

    cleanupSubscription(
      queryClient,
      hashedSubscriptionKey,
      pageParam ?? undefined
    )

    const subscription = stream$
      .pipe(
        skip(1), // Skip the first value, as it's already returned in the promise
        tap((data) => {
          queryClient.setQueryData<TCacheData>(subscriptionKey, (previous) =>
            dataUpdater(data, previous, pageParam)
          )
        }),
        catchError((error) => {
          failRefetchWith.current = error
          queryClient.setQueryData<TCacheData>(
            subscriptionKey,
            (data) => data,
            {
              // To make the retryOnMount work
              // @see: https://github.com/tannerlinsley/react-query/blob/9e414e8b4f3118b571cf83121881804c0b58a814/src/core/queryObserver.ts#L727
              updatedAt: 0,
            }
          )
          return of(undefined)
        }),
        finalize(() => {
          void queryClient.invalidateQueries(
            { queryKey: subscriptionKey },
            { cancelRefetch: false }
          )
        })
      )
      .subscribe()

    // remember the current subscription
    // see `cleanup` fn for more info
    storeSubscription(
      queryClient,
      hashedSubscriptionKey,
      subscription,
      pageParam ?? undefined
    )

    return result
  }

  return {
    queryFn,
    clearErrors: () => {
      // Once the error has been thrown, and a query result created (with error)
      // cleanup the `failRefetchWith`.
      failRefetchWith.current = false
    },
  }
}
