/* eslint-disable @typescript-eslint/no-explicit-any */
import { Injectable, NgZone } from '@angular/core';
import { first, Observable, switchMap, retry } from 'rxjs';
import { HttpClient, HttpHeaders } from '@angular/common/http';
import { UserService } from '../login/providers';
import { fp } from '@origin8-web/o8-utils/fp';

@Injectable({
  providedIn: 'root',
})
export class SseService {
  constructor(private http: HttpClient, private userService: UserService, private _zone: NgZone) {}

  /*
    If retryOptions.count === 0 no reconnection attempt will be made
    Default is 12 reconnection attempts with 15 seconds delay between. So try for ~ 3min then giving up.
  */
  subscribeToSecureSse<T extends object>(
    url: string,
    options?: { count?: number; delay?: number; onSseStart?: () => void },
  ): Observable<T> {
    const activeOptions = { count: 12, delay: 15000, onSseStart: () => {}, ...options };
    const retryOptions = fp.pick(activeOptions, 'count', 'delay');
    const sse$ = this.getSignedUrlForSse(url).pipe(
      switchMap((res) => this.getSseForUrl<T>(res.streamUrl, fp.pick(activeOptions, 'onSseStart'))),
    );
    return retryOptions.count === 0 ? sse$ : sse$.pipe(retry({ ...retryOptions, resetOnSuccess: true }));
  }

  getSseForUrl<T extends object>(url: string, options?: { onSseStart: () => void }): Observable<T> {
    return new Observable<T>((observer) => {
      const eventSource = new EventSource(url);
      const messageHandler = (evt: MessageEvent<string>) => {
        this._zone.run(() => {
          observer.next(JSON.parse(evt.data) as T);
        });
      };

      const onResultListener = (evt: MessageEvent<string>) => {
        this._zone.run(() => {
          observer.next(JSON.parse(evt.data).result);
        });
      };
      eventSource.addEventListener('result', onResultListener);

      const errorHandler = (err: any) => {
        this._zone.run(() => {
          console.warn(`An error occured on the sse stream at ${new Date().toISOString()}. Closing it.`, err);
          eventSource.removeEventListener('result', onResultListener);
          eventSource.close();
          observer.error(err);
        });
      };

      const openHandler = () => {
        if (options?.onSseStart) {
          console.log(`Opening SSE Stream at ${new Date().toISOString()}`);
          options?.onSseStart();
        }
      };

      eventSource.onmessage = messageHandler;
      eventSource.onopen = openHandler;
      eventSource.onerror = errorHandler;
    });
  }

  getSignedUrlForSse(url: string): Observable<{ streamUrl: string }> {
    return this.userService.getAccessToken$().pipe(
      first(),
      switchMap((token) => {
        const headers = token ? new HttpHeaders().set('Authorization', `Bearer ${token}`) : new HttpHeaders();
        return this.http.get<{ streamUrl: string }>(url, { headers });
      }),
    );
  }
}
