import { Injectable } from '@angular/core';
import { LRTaskState, LRTaskStatus } from '@app/api';
import { ApiService } from '@app/core/services';
import { asyncScheduler, Observable, SchedulerLike } from 'rxjs';
import { finalize, switchMap, takeWhile } from 'rxjs/operators';

@Injectable({
  providedIn: 'root',
})
export class LongRunningTaskService {
  private taskMapping = new Map<string, Observable<LRTaskStatus>>(); //Persisent

  constructor(protected apiService: ApiService) {}

  getObservableForTask(taskId: string): Observable<LRTaskStatus> {
    const map = this.taskMapping;

    return map.get(taskId) || this.create(taskId);
  }

  private create(taskId: string): Observable<LRTaskStatus> {
    const step0 = delayGradient();

    // cleanup
    const step1 = step0.pipe(
      finalize(() => {
        this.taskMapping.delete(taskId);
      })
    );

    // request
    const step2 = step1.pipe(
      switchMap(x => {
        return this.apiService.getTaskStatusObservable(taskId);
      })
    );

    // stop when state is done
    const step3 = step2.pipe(
      takeWhile(x => {
        return x.state == LRTaskState.Working || x.state == LRTaskState.Pending;
      }, true)
    );

    this.taskMapping.set(taskId, step3);

    return step3;
  }
}

function delayGradient(scheduler: SchedulerLike = asyncScheduler): Observable<number> {
  return new Observable<number>(subscriber => {
    var count = 0;

    return scheduler.schedule(function () {
      if (subscriber.closed) {
        subscriber.complete();
        return;
      } else {
        const delay = 1600 * Math.log(++count + 1);

        this.schedule(undefined, delay);
      }

      subscriber.next(count);
    });
  });
}
