import { catchError, connect, distinctUntilChanged, EMPTY, exhaustMap, merge, mergeMap, Observable, of, pipe, retry, switchMap, throwError, timer } from 'rxjs';
import { MANUALLY_STOP } from '../utils';
export function mapInto(l$) {
    return pipe(mergeMap((value)=>{
        l$.next(value);
        return EMPTY;
    }));
}
export function catchErrorInto(l$, cb) {
    return pipe(onComplete(()=>l$.next(null)), catchError((error)=>{
        l$.next(error);
        cb?.(error);
        return EMPTY;
    }));
}
export function onStart(cb) {
    return (observable$)=>new Observable((subscribe)=>{
            cb();
            return observable$.subscribe(subscribe);
        });
}
export function onComplete(cb) {
    return (observable$)=>new Observable((subscribe)=>{
            return observable$.subscribe({
                complete () {
                    cb();
                    subscribe.complete();
                },
                error (err) {
                    subscribe.error(err);
                },
                next (value) {
                    subscribe.next(value);
                }
            });
        });
}
export function fromPromise(promise) {
    return new Observable((subscriber)=>{
        const abortController = new AbortController();
        const rawPromise = promise instanceof Function ? promise(abortController.signal) : promise;
        rawPromise.then((value)=>{
            subscriber.next(value);
            subscriber.complete();
        }).catch((error)=>{
            subscriber.error(error);
        });
        return ()=>abortController.abort(MANUALLY_STOP);
    });
}
export function backoffRetry({ when, count = 3, delay = 200, maxDelay = 15000 } = {}) {
    return (obs$)=>obs$.pipe(retry({
            count,
            delay: (err, retryIndex)=>{
                if (when && !when(err)) {
                    return throwError(()=>err);
                }
                const d = Math.pow(2, retryIndex - 1) * delay;
                return timer(Math.min(d, maxDelay));
            }
        }));
}
export function exhaustMapSwitchUntilChanged(comparator, project, onSwitch) {
    return pipe(connect((shared$)=>shared$.pipe(distinctUntilChanged(comparator), switchMap((value)=>{
            onSwitch?.(value);
            return merge(of(value), shared$).pipe(exhaustMap((value, index)=>{
                return project(value, index);
            }));
        }))));
}
