import {
    EMPTY,
    merge,
    Observable,
    of,
    OperatorFunction,
    Subject,
    throwError,
} from "rxjs"
// import { InnerSubscriber, OuterSubscriber } from "rxjs/internal-compatibility"
import { Map } from "immutable"
import {
    catchError,
    concatWith,
    distinctUntilChanged,
    endWith,
    filter,
    ignoreElements,
    map,
    mergeMap,
    scan,
    share,
    take,
    takeUntil,
    takeWhile,
    tap,
} from "rxjs/operators"

interface ProcessorOutput {
    id: number
    next?: any
    err?: any
    completed?: boolean
}

/**
 * Create a operator factory. The factory creates operators that is similar to mergeMap mergeMap.
 * Different from the mergeMap operator, operators created by the factory share the same concurrent context.
 * Those operators might be used by multiple stream but the running job count won't exceed given concurrent degrees.
 *
 *
 * @example
 * const source = interval(100)
 * const strand = newStrand(2)
 *
 * // cosnt buggyStreamA = source.pipe(mergeMap(x =\> of(x * 2), 2))
 * const streamA = source.pipe(strand((x) =\> of(x * 2)))
 * // const buggyStreamB = source.pipe(mergeMap(x =\> of(x * 4), 2))
 * const streamB = source.pipe(strand((x) =\> of(x * 4)))
 *
 * // at any moment, allows at most 2 concurrent jobs
 * streamA.subscribe(console.log)
 * streamB.subscribe(console.log)
 *
 * // Each mergeMap operator ensures there's at most two concurrent jobs, while they are independent to each other, meaning
 * // there might be a moment that 4 concurrent jobs are running.
 * // buggyStreamA.subscribe(console.log)
 * // buggyStreamB.subscribe(console.log)
 *
 * @param concurrent - concurrent degrees, the maximum concurrent jobs allowed
 */
export function newStrand(concurrent: number) {
    const cancel$ = new Subject<number>()
    const processor$ = new Subject<{
        id: number
        job: Observable<any>
    }>()
    const running$: Observable<ProcessorOutput> = processor$.pipe(
        mergeMap(
            ({ id, job }) =>
                job.pipe(
                    map((next) => ({ id, next })),
                    concatWith(of({ id, completed: true })),
                    catchError((err) => of({ id, err })),
                    takeUntil(
                        cancel$.pipe(filter((cancelID) => cancelID === id)),
                    ),
                ),
            // concurrent,
        ),
        share(),
    )
    let next = 0
    function schedule<T>(job: Observable<T>): Observable<T> {
        const id = next++
        return new Observable<ProcessorOutput>((subscriber) => {
            running$.subscribe(subscriber)
            processor$.next({ id, job })
            return () => cancel$.next(id)
        }).pipe(
            filter(({ id: outputID }) => outputID === id),
            takeWhile((event) => !event.completed),
            mergeMap((event) => {
                if (event.err) {
                    return throwError(() => event.err)
                }
                return of(event.next as T)
            }),
        )
    }
    return function <I, O>(
        projection: (input: I) => Observable<O>,
    ): OperatorFunction<I, O> {
        return function (source: Observable<I>): Observable<O> {
            const jobs$ = source.pipe(map(projection))
            return jobs$.pipe(mergeMap((job) => schedule(job)))
        }
    }
}

interface StateStoreInternalAction<I, T extends { id: I }> {
    update?: T
    remove?: I
    stop?: boolean
}

interface StateStoreInternal<I, T extends { id: I }> {
    sourceCompleted: boolean
    store: Map<I, T>
}

/**
 * stateStore operator maintains a mutable list of T. The item is added to the list by events. If the item with
 * the same index I is already in the list, the item will be updated by merging the properties.
 *
 * An optional removal notifier remove$ can be specified to remove the item from the list by index.
 *
 * @param remove$ - notifier to remove items from the list
 */
export function stateStore<I, T extends { id: I }>(
    remove$?: Observable<I>,
): OperatorFunction<T, Map<I, T>> {
    return (source$: Observable<T>) => {
        return merge(
            source$.pipe(
                map((update) => ({ update })),
                endWith({ stop: true }),
            ),
            (remove$ || EMPTY).pipe(map((remove) => ({ remove }))),
        ).pipe(
            scan<StateStoreInternalAction<I, T>, StateStoreInternal<I, T>>(
                (
                    { store, sourceCompleted }: StateStoreInternal<I, T>,
                    c: StateStoreInternalAction<I, T>,
                ) => {
                    if (c.stop) {
                        return { store, sourceCompleted: true }
                    }
                    if (c.update) {
                        if (store.has(c.update.id)) {
                            return {
                                store: store.update(c.update.id, (origin) => ({
                                    ...origin,
                                    ...c.update,
                                })),
                                sourceCompleted,
                            }
                        }
                        return {
                            store: store.set(c.update.id, c.update),
                            sourceCompleted,
                        }
                    }
                    if (c.remove !== undefined) {
                        return {
                            store: store.remove(c.remove),
                            sourceCompleted,
                        }
                    }
                    return { store, sourceCompleted }
                },
                { store: Map<I, T>(), sourceCompleted: false },
            ),
            takeWhile(
                ({ store, sourceCompleted }) =>
                    !(sourceCompleted && store.isEmpty()),
                true,
            ),
            distinctUntilChanged((x, y) => x.store.equals(y.store)),
            map(({ store }) => store),
        )
    }
}

type ValueOfMap<M extends Map<any, any>> = M extends Map<any, infer V>
    ? V
    : never
export function mapToArray<M extends Map<any, any>>() {
    return map((x: M) => x.toArray() as Array<ValueOfMap<M>>)
}

/**
 * terminalActions accepts a bunch of terminal action notifier. Terminal action notifiers are observables
 * with the same event type T to the source observable. When one of the terminal action observables emits,
 * the operator unsubscribe the source and emit the notification from the terminal action observable as the last notification.
 *
 * @param terminals - sources of terminal actions
 */
export function terminalActions<T>(
    ...terminals: Array<Observable<T>>
): OperatorFunction<T, T> {
    const end$ = merge(...terminals).pipe(share())
    return (source$: Observable<T>) => {
        source$ = source$.pipe(share())
        return merge(
            source$.pipe(takeUntil(end$)),
            end$.pipe(
                takeUntil(source$.pipe(ignoreElements(), endWith(true))),
                take(1),
            ),
        )
    }
}

// debug utility
export function debug<T>(label?: string) {
    if (label) {
        return tap<T>(console.log.bind(null, label))
    }
    return tap<T>(console.log)
}
