import chunk from 'lodash/chunk';
import { YieldController } from './yieldController';
import assertNever from '@watershed/shared-util/assertNever';

/**
 * An asynchronous version of `Array.prototype.filter`.
 *
 * @returns a Promise that resolves to an array of all array elements which pass a given asynchronous test.
 */
export async function filterAsync<T>(
  arr: ReadonlyArray<T>,
  predicate: (v: T) => Promise<boolean>
): Promise<Array<T>> {
  // Compute predicates asynchronously
  const predicateResults = await mapAsync(arr, predicate);

  // Filter results, periodically yielding the event loop
  // if needed.
  const results: Array<T> = [];
  const yc = new YieldController();
  for (let i = 0; i < predicateResults.length; i++) {
    if (yc.shouldYield()) {
      await yc.yield();
    }
    if (predicateResults[i]) {
      results.push(arr[i]);
    }
  }

  return results;
}

/**
 * An asynchronous version of `Array.prototype.forEach`.
 */
export async function forEachAsync<T>(
  arr: ReadonlyArray<T>,
  iteratorFn: (v: T, i: number) => Promise<void>
): Promise<void> {
  const yc = new YieldController();
  for (let i = 0; i < arr.length; i++) {
    if (yc.shouldYield()) {
      await yc.yield();
    }
    await iteratorFn(arr[i], i);
  }
}

/**
 * Like forEachAsync, but runs the callback in concurrent batches.
 */
export async function forEachAsyncWithConcurrency<T>(
  iter: Iterable<T>,
  iteratorFn: (v: T, i: number) => void | Promise<void>,
  concurrency: number
): Promise<void> {
  // This implementation doesn't use p-limit because we had trouble getting it
  // the dynamic import used below to work with Jest in CI.
  let batch = new Array<T>();
  const flushBatch = async () => {
    if (batch.length !== 0) {
      await Promise.all(batch.map((item, i) => iteratorFn(item, i)));
      batch = [];
    }
  };
  for (const item of iter) {
    batch.push(item);
    if (batch.length === concurrency) {
      await flushBatch();
    }
  }
  await flushBatch();
}

/**
 * An asynchronous version of `Array.prototype.map`. Warning: Despite being async,
 * calling this with a large array may still result in Node blocked time. Prefer
 * mapAsyncChunked in this case.
 *
 * @returns a Promise that resolves to an array of the results of calling a
 * given asynchronous function on every element in an array.
 */
export async function mapAsync<T, S>(
  arr: ReadonlyArray<T>,
  iteratorFn: (v: T, i: number) => Promise<S>
): Promise<Array<S>> {
  const promises: Array<Promise<S>> = [];
  const yc = new YieldController();

  for (let i = 0; i < arr.length; i++) {
    if (yc.shouldYield()) {
      await yc.yield();
    }
    const element = arr[i];
    promises.push(iteratorFn(element, i));
  }

  return Promise.all(promises);
}

/**
 * An asynchronous version of `Array.prototype.flatMap`.
 *
 * @returns a Promise that resolves to a flattened array of the results of calling
 * a given asynchronous function on every element in an array.
 */
export async function flatMapAsync<T, S>(
  arr: ReadonlyArray<T>,
  iteratorFn: (v: T, i: number) => Promise<Array<S> | S>
): Promise<Array<S>> {
  return (await Promise.all(arr.map(iteratorFn))).flat() as Array<S>;
}

/**
 * An asynchronous version of `Array.prototype.map` that runs the iterator of
 * each element serially rather than in parallel.
 *
 * @returns a Promise that resolves to an array of the results of calling a
 * given asynchronous function on every element in an array.
 */
export async function mapAsyncSerially<T, S>(
  arr: ReadonlyArray<T>,
  iteratorFn: (v: T, index: number) => Promise<S>
): Promise<Array<S>> {
  return arr.reduce<Promise<Array<S>>>(async (accum, item, index) => {
    const lastProjects = await accum;
    const mappedItem = await iteratorFn(item, index);
    // accepting this mutation for performance reasons since we created the
    // empty array passed to reduce in the first place
    lastProjects.push(mappedItem);
    return lastProjects;
  }, Promise.resolve([]));
}

/**
 * An asynchronous version of `Array.prototype.flatMap` that runs the iterator
 * of each element serially rather than in parallel.
 *
 * @returns a Promise that resolves to a flattened array of the results of
 * calling a given asynchronous function on every element in an array.
 */
export async function flatMapAsyncSerially<T, S>(
  arr: ReadonlyArray<T>,
  iteratorFn: (v: T, index: number) => Promise<Array<S> | S>
): Promise<Array<S>> {
  const nonFlat = await arr.reduce<Promise<Array<S | Array<S>>>>(
    async (accum, item, index) => {
      const lastProjects = await accum;
      const mappedItem = await iteratorFn(item, index);
      // accepting this mutation for performance reasons since we created the
      // empty array passed to reduce in the first place
      lastProjects.push(mappedItem);
      return lastProjects;
    },
    Promise.resolve([])
  );

  return nonFlat.flat() as Array<S>;
}

/**
 * Splits a list into chunks, then serially calls an async function with each chunk of
 * that list. Output is returned as a single flat array.
 */
export async function mapAsyncChunked<T, S>(
  arr: ReadonlyArray<T>,
  iteratorFn: (
    v: Array<T>,
    chunkIndex: number,
    totalChunks: number
  ) => Promise<Array<S>>,
  chunkSize: number
): Promise<Array<S>> {
  const output: Array<S> = [];
  const chunkedInput = chunk(arr, chunkSize);
  for (const [chunkIndex, inputChunk] of chunkedInput.entries()) {
    for (const mappedItem of await iteratorFn(
      inputChunk,
      chunkIndex,
      chunkedInput.length
    )) {
      output.push(mappedItem);
    }
  }
  return output;
}

export async function mapAsyncWithConcurrency<T, S>(
  arr: ReadonlyArray<T>,
  iteratorFn: (v: T, i: number) => Promise<S>,
  concurrency: number
): Promise<Array<S>> {
  const limit = (await import('p-limit')).default;

  const limitedByConcurrency = limit(concurrency);

  return await Promise.all(
    arr.map((elem, i) => limitedByConcurrency(() => iteratorFn(elem, i)))
  );
}

export async function flatMapAsyncWithConcurrency<T, S>(
  arr: ReadonlyArray<T>,
  iteratorFn: (v: T, i: number) => Promise<Array<S> | S>,
  concurrency: number
): Promise<Array<S>> {
  return (
    await mapAsyncWithConcurrency(arr, iteratorFn, concurrency)
  ).flat() as Array<S>;
}

/**
 * Asynchronous function composition. Given two functions, `f` and `g`, returns
 * `f ∘ g` (i.e. `x => f(g(x))`).
 *
 * @returns an asynchronous function that computes `f(g(x))`.
 */
export function composeAsync<In, Intermediate, Out>(
  f: (a1: Intermediate) => Out,
  g: (a1: In) => Promise<Intermediate>
): (a1: In) => Promise<Out> {
  return async (a1) => f(await g(a1));
}

export async function everyAsync<T>(
  arr: ReadonlyArray<T>,
  predicate: (v: T, i: number) => Promise<boolean>
): Promise<boolean> {
  for (let i = 0; i < arr.length; i++) {
    if (!(await predicate(arr[i], i))) return false;
  }
  return true;
}

export async function someAsync<T>(
  arr: ReadonlyArray<T>,
  predicate: (v: T, i?: number) => Promise<boolean>
): Promise<boolean> {
  for (let i = 0; i < arr.length; i++) {
    if (await predicate(arr[i], i)) return true;
  }
  return false;
}

export async function findAsync<T>(
  arr: ReadonlyArray<T>,
  predicate: (v: T) => Promise<boolean>
): Promise<T | undefined> {
  for (let i = 0; i < arr.length; i++) {
    if (await predicate(arr[i])) return arr[i];
  }
}

export async function mapValuesAsync<T, V>(
  obj: Record<string, T>,
  iteratorFn: (v: T) => Promise<V>
): Promise<Record<string, V>> {
  const keys = Object.keys(obj);
  const values = await Promise.all(keys.map((key) => iteratorFn(obj[key])));
  return keys.reduce(
    (accum, key, index) => {
      accum[key] = values[index];
      return accum;
    },
    {} as Record<string, V>
  );
}

const DEFAULT_MAX_FAILURES = 10;
export async function doWhileInBatches<TSuccess, TFailure>(
  fn: (
    prevBatchResults: {
      successes: Array<TSuccess>;
      failures: Array<TFailure>;
      processedUpTo?: string;
    } | null
  ) => Promise<{
    successes: Array<TSuccess>;
    failures: Array<TFailure>;
    processedUpTo?: string;
  }>,
  batchLimit?: number,
  maxFailures?: number
): Promise<{
  successCount: number;
  sampleSuccesses: Array<TSuccess>;
  failures: Array<TFailure>;
  tooManyFailures: boolean;
  processedUpTo?: string;
}> {
  let batchesProcessed = 0;
  let totalSuccessCount = 0;
  let latestSuccessCount: number = 0;
  let sampleSuccesses: Array<TSuccess>;

  maxFailures ??= DEFAULT_MAX_FAILURES;
  const allFailures = [];
  let prevBatchResults = null;

  do {
    const batchResults = await fn(prevBatchResults);
    const { successes, failures } = batchResults;
    latestSuccessCount = successes.length;
    totalSuccessCount += latestSuccessCount;
    sampleSuccesses ??= successes;
    allFailures.push(...failures);
    batchesProcessed += 1;
    prevBatchResults = batchResults;
  } while (
    latestSuccessCount > 0 &&
    allFailures.length < maxFailures &&
    (batchLimit === undefined || batchesProcessed < batchLimit)
  );

  return {
    successCount: totalSuccessCount,
    sampleSuccesses,
    failures: allFailures,
    tooManyFailures: allFailures.length >= maxFailures,
    processedUpTo: prevBatchResults?.processedUpTo,
  };
}

// Copied from duckdb-server
// Like Promise.all in the sense that it resolves with an array of results when
// every promise is done, but like Promise.allSettled in the sense that it will
// wait for every promise in the array to have a result before either resolving
// or rejecting (unlike Promise.all which shortcuts after the first rejection).
export async function pAll<T>(ps: Array<Promise<T>>): Promise<Array<T>> {
  const out = [];
  for (const p of await Promise.allSettled(ps)) {
    switch (p.status) {
      case 'fulfilled':
        out.push(p.value);
        break;
      case 'rejected':
        throw p.reason;
      default:
        assertNever(p);
    }
  }
  return out;
}
