import { timer, Observable } from 'rxjs';
import { tap, filter, switchMap, scan, first } from 'rxjs/operators';

function checkAttempts(maxAttempts: number) {
  return (attempts: number) => {
    if (attempts > maxAttempts) {
      throw new Error('Error: max attempts');
    }
  };
}

export function pollUntil<T>(
  pollInterval: number,
  maxAttempts: number,
  responsePredicate: (res: any) => boolean,
) {
  return (source$: Observable<T>) => {
    let sourceComplete = true;
    return timer(0, pollInterval).pipe(
      scan((attempts) => ++attempts, 0),
      tap(checkAttempts(maxAttempts)),
      filter(() => sourceComplete),
      switchMap(() => {
        sourceComplete = false;
        return source$.pipe(tap((_) => (sourceComplete = true)));
      }),
      first(responsePredicate),
    );
  };
}
