import { Auth } from "@aws-amplify/auth"
import { Empty } from "google-protobuf/google/protobuf/empty_pb"
import { StatusCode } from "grpc-web"
import { basename } from "path-browserify"
import { IntlShape, defineMessages } from "react-intl"
import {
    EMPTY,
    NEVER,
    Observable,
    Observer,
    OperatorFunction,
    Subject,
    asyncScheduler,
    combineLatest,
    concat,
    forkJoin,
    from,
    iif,
    merge,
    of,
    timer,
} from "rxjs"
import {
    catchError,
    concatMap,
    delay,
    delayWhen,
    distinctUntilChanged,
    distinctUntilKeyChanged,
    endWith,
    filter,
    finalize,
    ignoreElements,
    map,
    mapTo,
    mergeMap,
    repeat,
    retry,
    retryWhen,
    scan,
    share,
    shareReplay,
    startWith,
    switchMap,
    take,
    takeUntil,
    takeWhile,
    tap,
    throttleTime,
    withLatestFrom,
} from "rxjs/operators"
import { TaskType } from "./TaskType"
import {
    Conflict,
    ConflictResolveType,
    GroupedTaskStatus,
    ResolveSubjects,
    TaskRef,
    TaskState,
    TransferTaskStatus,
    UITaskStatus,
    isID,
    isNestedID,
    stateScan,
} from "./context/task"
import { HttpKey } from "./enums"
import { OfGrpc } from "./grpc"
import { Progress, upload } from "./helper/api"
import {
    mapToArray,
    newStrand,
    stateStore,
    terminalActions,
} from "./helper/rxjs"
import driveErrorMessage from "./messages/driveErrorCode"
import toastMessage from "./messages/toast"
import { IFile } from "./models/IFile"
import { ensureSlash, multipleName } from "./path-util"
import { QueryUsedResponse } from "./protocol/customer_pb"
import {
    ArchiveRequest,
    ArchiveResponse,
    DriveErrorCode,
    DropRequest,
    GetUploadLinksRequest,
    GetUploadLinksResponse,
    IsExistsRequest,
    IsExistsResponse,
    MkdirRequest,
    MkdirResponse,
    PathMessage,
} from "./protocol/drive_pb"
import {
    GetTaskCountRequest,
    GetTaskCountResponse,
    ImageProcessingErrorCode,
    RemoveBackgroundRequest,
    RemoveBackgroundResponse,
} from "./protocol/image_processing_pb"
import { UploadFileInOrder } from "./utils/uploadInOrder"
import {
    getRemoveBackgroundImageSavePaths,
    getRemoveBackgroundRequest,
    getValidRemoveImageBackgroundFiles,
} from "./utils/removeBackground"
import { removeTrailingSlash } from "./utils/string"
import { getRandomID } from "./utils/random"
import { DELETE_FILE_CHUNK_SIZE } from "./constants"

export interface ContainPathFile extends File {
    fullPath: string
    webkitRelativePath: string
}

export enum UploadError {
    EXCEED_UPLOAD_SIZE,
    INSUFFICIENT_SPACE,
    SERVER_BUSY,
    DRIVE_INVALID_DESTINATION,
}

function label<V, P extends string>(
    propName: P,
): OperatorFunction<V, { [key in P]: V }> {
    return map((x: V) => ({ [propName]: x } as { [key in P]: V }))
}

interface IuploadFileWorker {
    uploadTo: string
    file: File
    uploaded$: Observable<UploadedEvent>
    identity$: Observable<string>
    error$: Observer<UploadError | void>
    stopSubTask$: Subject<Progress>
    uploadFileInOrder: UploadFileInOrder
    taskID: number
    subTaskID: number
    uploadFileID: string
}

export function uploadFileWorker({
    // 上傳檔案
    uploadTo,
    file,
    uploaded$,
    identity$,
    error$,
    stopSubTask$,
    uploadFileInOrder,
    taskID,
    subTaskID,
    uploadFileID,
}: IuploadFileWorker): Observable<Progress> {
    return identity$.pipe(
        take(1),
        switchMap((identity) => {
            return of(file).pipe(
                uploadFileInOrder.storeWorkOperator({
                    file,
                    identity,
                    uploadTo,
                    taskID,
                    subTaskID,
                    uploadFileID,
                    error$,
                    stopSubTask$,
                }),
                switchMap(({ urlLink, file }) =>
                    concat(
                        upload(urlLink, file),
                        uploaded$.pipe(
                            filter(
                                (uploaded) =>
                                    uploaded.id === urlLink.getObjectId(),
                            ),
                            take(1),
                            tap(() => {
                                uploadFileInOrder.handleUploadedTask(
                                    uploadFileID,
                                )
                            }),
                            ignoreElements(),
                        ),
                    ).pipe(takeUntil(stopSubTask$)),
                ),
                catchError((e) => {
                    let errorCode
                    if (e === DriveErrorCode.DRIVE_SIZE_IS_NOT_ENOUGH) {
                        errorCode = UploadError.INSUFFICIENT_SPACE
                    } else if (e === DriveErrorCode.DRIVE_INVALID_DESTINATION) {
                        errorCode = UploadError.DRIVE_INVALID_DESTINATION
                    }
                    error$.next(errorCode)
                    stopSubTask$.next({ loaded: 1, total: 1 })
                    uploadFileInOrder.deleteWorkTaskByTaskID([
                        taskID,
                        subTaskID,
                    ])
                    return EMPTY
                }),
            )
        }),
    )
}

const downloadWithRetry = (
    prefix: string,
    downloadLink: string,
    isArchive?: boolean,
) =>
    downloadWithProgressEvent(prefix, downloadLink, isArchive).pipe(
        retryWhen((errors) =>
            concat(
                errors.pipe(delay(500), take(3)),
                errors.pipe(
                    map((x) => {
                        throw x
                    }),
                ),
            ),
        ),
    )

function downloadWithProgressEvent(
    prefix: string,
    downloadLink: string,
    isArchive?: boolean,
) {
    return new Observable<{ loaded: number; total: number }>((subscriber) => {
        const ajaxReq = new XMLHttpRequest()
        ajaxReq.responseType = "blob"
        ajaxReq.onprogress = (e) => {
            subscriber.next(e)
        }
        ajaxReq.onreadystatechange = () => {
            // the ajax is done
            if (ajaxReq.readyState === 4) {
                if (ajaxReq.status !== 200) {
                    subscriber.error(
                        new Error("download status code is not 200"),
                    )
                }
            }
        }
        ajaxReq.onload = () => {
            const a = document.createElement("a")
            const url = window.URL.createObjectURL(ajaxReq.response)
            a.href = url
            a.download = prefix
                ? isArchive
                    ? `${prefix}.zip`
                    : prefix
                : `saasphoto ${new Date()
                      .toLocaleString()
                      .replace(/\//g, "-")}.zip`
            a.click()
            window.URL.revokeObjectURL(url)
            a.remove()
            subscriber.complete()
        }
        ajaxReq.open("GET", downloadLink, true)
        ajaxReq.send()
        return () => ajaxReq.abort()
    })
}

function downloadSingleFile(
    file: IFile,
    prefix: string = "",
    error$: Observer<void>,
): Observable<Partial<TransferTaskStatus>> {
    const progress$ = downloadWithRetry(prefix, file.link, false).pipe(
        map((x) => x.loaded / file.size),
        catchError(() => {
            error$.next()
            return NEVER
        }),
        share(),
    )
    const state$ = progress$.pipe(
        mapTo(TaskState.Running),
        startWith(TaskState.Pending),
        endWith(TaskState.Finished),
    )
    return merge(state$.pipe(label("state")), progress$.pipe(label("progress")))
}

export interface ArchiveResult {
    downloadLink: string
    ok: boolean
    id: string
    progress: number
    taskID: string
}

interface IdownloadFile {
    getArchive: OfGrpc<ArchiveRequest, ArchiveResponse>
    files: IFile[]
    prefix: string
    error$: Observer<void>
    archiveResult$: Observable<ArchiveResult>
    progressResult$: Observable<ArchiveResult>
    onCompleteCompress?: (filename: string) => void
}

function downloadFile({
    getArchive,
    files,
    prefix = "",
    error$,
    archiveResult$,
    progressResult$,
    onCompleteCompress,
}: IdownloadFile): Observable<Partial<TransferTaskStatus>> {
    if (files.length === 1 && !files[0].isDir) {
        return downloadSingleFile(files[0], prefix, error$)
    }
    const zipFileName =
        files.length === 1
            ? `${files[0].name}_${Date.now()}`
            : `download_${Date.now()}`
    return from(Auth.currentCredentials()).pipe(
        switchMap((credentials) => {
            const request = new ArchiveRequest()
            request.setKeyList(files.map((x) => x.key))
            request.setIdentityId(credentials.identityId)
            request.setZipFileName(zipFileName)

            const archiving$ = getArchive(request).pipe(
                catchError(() => {
                    error$.next()
                    return EMPTY
                }),
                share(),
            )
            const id$ = archiving$.pipe(map((x) => x.getId()))

            const downloading$ = archiveResult$.pipe(
                withLatestFrom(id$),
                filter(([result, id]) => result.id === id),
                filter(([result]) => result.downloadLink.length > 0),
                take(1),
                switchMap(([result]) =>
                    downloadWithRetry(zipFileName, result.downloadLink, true),
                ),
                map(
                    (progressEvent) =>
                        progressEvent.loaded / progressEvent.total,
                ),
                map((progress) => progress),
                catchError((err) => {
                    console.log(err)
                    error$.next()
                    return EMPTY
                }),
                share(),
            )
            const progress$ = merge(downloading$)

            const state$ = progress$.pipe(
                startWith(TaskState.Pending),
                mapTo(TaskState.Running),
                endWith(TaskState.Finished),
                distinctUntilChanged(),
            )

            return merge(
                state$.pipe(label("state")),
                progress$.pipe(throttleTime(100), label("progress")),
                progressResult$.pipe(
                    withLatestFrom(id$),
                    filter(([result, id]) => result.taskID === id),
                    tap(([result]) => {
                        if (result.progress === 100 && onCompleteCompress)
                            onCompleteCompress(`${zipFileName}.zip`)
                    }),
                    map(([result]) => result.progress / 100),
                    label("compress"),
                ),
            )
        }),
    )
}

function getFileUploadTo(file: File, prefix: string = "") {
    return ensureSlash(prefix) + getFilePath(file)
}

function getFilePath(file: File) {
    const f = file as ContainPathFile
    return (f.webkitRelativePath || f.fullPath || f.name) as string
}

function inferUploadPath(file: File, keepSlash?: boolean) {
    const path = getFilePath(file)
    const slash = path.indexOf("/")
    if (slash !== -1) {
        return path.slice(0, slash) + (keepSlash ? "/" : "")
    }
    return path
}

interface SubTaskStatus {
    id: number
    error: any
    state: TaskState
    progress: number
}

export type Queued<T> = T & { id: number }

type CreateDirectory = string
export type UploadItem = File | CreateDirectory

export function isCreateDirectory(item: UploadItem): item is CreateDirectory {
    return typeof item === "string"
}

export interface UploadIntent {
    // an intent of a upload action might contains multiple items. For example,
    // the user can drag'n'drop a file and two directories at one action.
    // While uploading multiple entries(and a directory entry might potentially means multiple files),
    // the app need to group the task by intent (user action) and by entries.
    items: UploadItem[][]
    prefix?: string
    onCompleteCompress?: (filename: string) => void
}

export interface DownloadIntent {
    targets: IFile[]
    prefix?: string
    onComplete?(): void
    onCompleteCompress?: (filename: string) => void
}

interface InewDownloadProcessor {
    getArchive: OfGrpc<ArchiveRequest, ArchiveResponse>
    request$: Observable<Queued<DownloadIntent>>
    remove$: Observable<TaskRef>
    cancel$: Observable<TaskRef>
    archiveResult$: Observable<ArchiveResult>
    progressResult$: Observable<ArchiveResult>
}

export function newDownloadProcessor({
    getArchive,
    request$,
    remove$,
    cancel$,
    archiveResult$,
    progressResult$,
}: InewDownloadProcessor) {
    const error$ = new Subject<void>()
    return request$.pipe(
        mergeMap(
            ({ targets, prefix = "", id, onComplete, onCompleteCompress }) =>
                downloadFile({
                    getArchive,
                    files: targets,
                    prefix,
                    error$,
                    archiveResult$,
                    progressResult$,
                    onCompleteCompress,
                }).pipe(
                    terminalActions<Partial<TransferTaskStatus>>(
                        cancel$.pipe(
                            filter(isID),
                            filter((x) => x === id),
                            mapTo({
                                state: TaskState.Cancelled,
                            }),
                        ),
                        error$.pipe(
                            map((error) => ({ state: TaskState.Error, error })),
                        ),
                    ),
                    stateScan<TransferTaskStatus>({
                        id,
                        type: TaskType.DOWNLOAD,
                        state: TaskState.Pending,
                        progress: 0,
                        error: null,
                        path: prefix,
                        isDir: false,
                    }),
                    takeWhile((x) => x.state !== TaskState.Cancelled, true),
                    finalize(() => onComplete && onComplete()),
                ),
        ),
        stateStore(remove$.pipe(filter(isID))),
        mapToArray(),
    )
}

function flattenToTransferTaskStatus(
    x: GroupedTaskStatus<TransferTaskStatus>,
): TransferTaskStatus {
    if (x.children[0]) {
        const { children, ...rest } = x
        const { isDir, path, state } = children[0]
        return { ...rest, isDir, path, state }
    }
    return { ...x, isDir: false, path: "" }
}

const message = defineMessages({
    exceedUploadSize: {
        id: "task.upload.limit",
        defaultMessage: "Single file size can't exceed 4GB",
    },
})

function tryExplainError(intl: IntlShape, error: any): string | undefined {
    if (error === UploadError.EXCEED_UPLOAD_SIZE) {
        return intl.formatMessage(message.exceedUploadSize)
    }
    if (error === UploadError.INSUFFICIENT_SPACE) {
        return intl.formatMessage(driveErrorMessage.insufficientSpace)
    }
    if (error === UploadError.SERVER_BUSY) {
        return intl.formatMessage(toastMessage.serverBusy)
    }
    if (error === UploadError.DRIVE_INVALID_DESTINATION) {
        return intl.formatMessage(driveErrorMessage.invalidDestination)
    }
    return typeof error === "string" ? error : undefined
}

function hasErrorMessage(x: string | undefined): x is string {
    return x !== undefined
}

function getUploadItemPath(item: UploadItem) {
    return isCreateDirectory(item) ? item.split("/")[0] : inferUploadPath(item)
}

interface IcheckFileNameConflict {
    isExists: OfGrpc<IsExistsRequest, IsExistsResponse>
    items: UploadItem[][]
    prefix: string
    cancel$: Observable<void>
    error$: Observer<void>
}

function checkFileNameConflict({
    isExists,
    items,
    prefix,
    cancel$,
    error$,
}: IcheckFileNameConflict): Observable<string[]> {
    const request = new IsExistsRequest()

    items.forEach((files) => {
        // only directory has multiple files
        const file = files[0]
        const path = isCreateDirectory(file)
            ? ensureSlash(ensureSlash(prefix) + file.split("/")[0])
            : ensureSlash(prefix) + inferUploadPath(file, true)
        request.addPath(path)
    })

    return isExists(request).pipe(
        map((res) => res.getExistsList()),
        catchError(() => {
            error$.next()
            return NEVER
        }),
        takeUntil(cancel$),
        share(),
    )
}

interface IresolveName {
    getNewName: OfGrpc<PathMessage, PathMessage>
    file: UploadItem
    prefix: string
    cancel$: Observable<void>
    error$: Observer<void>
}

function resolveName({
    getNewName,
    file,
    prefix,
    cancel$,
    error$,
}: IresolveName): Observable<string> {
    const isCreateDir = isCreateDirectory(file)
    const path = isCreateDir
        ? ensureSlash(ensureSlash(prefix) + file)
        : ensureSlash(prefix) + inferUploadPath(file as File, true)

    const message = new PathMessage()
    message.setPath(path)
    return getNewName(message).pipe(
        map((res) => res.getPath()),
        catchError(() => {
            error$.next()
            return NEVER
        }),
        takeUntil(cancel$),
        share(),
    )
}

function replaceFileName(path: string, newName: string): string {
    if (path.includes("/")) {
        return `${newName}${path.slice(path.indexOf("/"), path.length)}`
    } else {
        return newName
    }
}

function updateName(name: string, file: UploadItem) {
    if (isCreateDirectory(file)) {
        file = name
    } else {
        if (getFilePath(file).includes("/")) {
            const filePath = getFilePath(file)
            Object.defineProperties(file, {
                name: {
                    value: replaceFileName(file.name, name),
                },
                webkitRelativePath: {
                    value: replaceFileName(filePath, name),
                },
                fullPath: {
                    value: replaceFileName(filePath, name),
                },
            })
        } else {
            Object.defineProperties(file, {
                name: {
                    value: replaceFileName(file.name, name),
                },
                type: {
                    value: file.type,
                },
            })
        }
    }
    return file
}

interface IgetUploadFilePipe {
    file: UploadItem
    worker: <F, T>(project: (f: F) => Observable<T>) => OperatorFunction<F, T>
    prefix: string
    mkdir: OfGrpc<MkdirRequest, MkdirResponse>
    cancel$: Observable<void>
    uploaded$: Observable<UploadedEvent>
    identity$: Observable<string>
    error$: Observer<UploadError | void>
    stopSubTask$: Subject<Progress>
    uploadFileInOrder: UploadFileInOrder
    taskID: number
    subTaskID: number
    uploadFileID: string
}

const getUploadFilePipe = ({
    file,
    worker,
    prefix,
    mkdir,
    cancel$,
    uploaded$,
    identity$,
    error$,
    stopSubTask$,
    uploadFileInOrder,
    taskID,
    subTaskID,
    uploadFileID,
}: IgetUploadFilePipe) => {
    return of(file).pipe(
        worker<
            UploadItem,
            | {
                  progress: number
              }
            | {
                  error: UploadError
              }
        >((x) => {
            if (isCreateDirectory(x)) {
                const request = new MkdirRequest()
                request.setKey(ensureSlash(prefix) + x)
                return mkdir(request).pipe(
                    map((x) => x.getStatus()),
                    tap((status) => {
                        status === MkdirResponse.ErrorCode.SERVER_BUSY &&
                            error$.next(UploadError.SERVER_BUSY)
                    }),
                    ignoreElements(),
                )
            }
            return uploadFileWorker({
                uploadTo: getFileUploadTo(x, prefix),
                file: x,
                uploaded$,
                identity$,
                error$,
                stopSubTask$,
                uploadFileInOrder,
                taskID,
                subTaskID,
                uploadFileID,
            }).pipe(
                map(({ loaded, total }) => ({
                    progress: loaded / total,
                })),
                startWith({ progress: 0 }),
                endWith({ progress: 1 }),
            )
        }),
        startWith({ state: TaskState.Pending }),
        endWith({ state: TaskState.Finished }),
        terminalActions<Partial<SubTaskStatus>>(
            cancel$.pipe(mapTo({ state: TaskState.Cancelled })),
        ),
        share(),
    )
}

const newFileName$ = new Subject<{
    name: string
    subTaskID: number
    taskID: number
}>()

interface IprocessSubTask {
    prefix: string
    items: Array<UploadItem>
    subTaskID: number
    worker: <F, T>(project: (f: F) => Observable<T>) => OperatorFunction<F, T>
    cancel$: Observable<void>
    error$: Subject<UploadError | void>
    stopSubTask$: Subject<Progress>
    conflict: ({ taskRef, filePath }: Conflict) => void
    resolves$: ResolveSubjects
    getNewName: OfGrpc<PathMessage, PathMessage>
    mkdir: OfGrpc<MkdirRequest, MkdirResponse>
    checkFileName$: Observable<string[]>
    uploaded$: Observable<UploadedEvent>
    identity$: Observable<string>
    uploadFileInOrder: UploadFileInOrder
    taskID: number
}

function processSubTask({
    prefix,
    items,
    subTaskID,
    worker,
    cancel$,
    error$,
    stopSubTask$,
    conflict,
    resolves$,
    getNewName,
    mkdir,
    checkFileName$,
    uploaded$,
    identity$,
    uploadFileInOrder,
    taskID,
}: IprocessSubTask): [Observable<TransferTaskStatus>, Observable<string>] {
    cancel$ = cancel$.pipe(take(1))
    const resolveName$ = resolveName({
        getNewName,
        file: items[0],
        prefix,
        cancel$,
        error$,
    })
    const uploadJobs = items.map((item, idx) => {
        const uploadFileID = getRandomID()
        const path = isCreateDirectory(item)
            ? ensureSlash(ensureSlash(prefix) + item)
            : ensureSlash(prefix) + inferUploadPath(item as File, true)
        const uploadFile$ = checkFileName$.pipe(
            tap((conflictPaths) => {
                conflictPaths.includes(path) &&
                    conflict({
                        taskRef: subTaskID,
                        filePath: isCreateDirectory(item)
                            ? item
                            : getFilePath(item as File),
                        file: item,
                    })
            }),
            switchMap((conflictPaths) =>
                iif(
                    () => conflictPaths.includes(path),
                    of(item).pipe(
                        delayWhen(() => resolves$[subTaskID]),
                        withLatestFrom(resolves$[subTaskID]),
                        switchMap(([x, y]) => {
                            // only resolve type of keepboth will rename file/folder
                            if (y.type === ConflictResolveType.Keepboth) {
                                return resolveName$.pipe(
                                    map((path) => basename(path)),
                                    tap((name) => {
                                        newFileName$.next({
                                            name,
                                            subTaskID,
                                            taskID,
                                        })
                                    }),
                                    map((name) => updateName(name, x)),
                                    switchMap((x) =>
                                        getUploadFilePipe({
                                            file: x,
                                            worker,
                                            prefix,
                                            mkdir,
                                            cancel$,
                                            uploaded$,
                                            identity$,
                                            error$,
                                            stopSubTask$,
                                            uploadFileInOrder,
                                            taskID,
                                            subTaskID,
                                            uploadFileID,
                                        }),
                                    ),
                                )
                            }

                            return getUploadFilePipe({
                                file: x,
                                worker,
                                prefix,
                                mkdir,
                                cancel$,
                                uploaded$,
                                identity$,
                                error$,
                                stopSubTask$,
                                uploadFileInOrder,
                                taskID,
                                subTaskID,
                                uploadFileID,
                            })
                        }),
                    ),

                    getUploadFilePipe({
                        file: item,
                        worker,
                        prefix,
                        mkdir,
                        cancel$,
                        uploaded$,
                        identity$,
                        error$,
                        stopSubTask$,
                        uploadFileInOrder,
                        taskID,
                        subTaskID,
                        uploadFileID,
                    }),
                ),
            ),
            takeUntil(cancel$),
            share(),
        )

        return {
            state$: uploadFile$,
            newPath$: uploadFile$.pipe(
                filter((x) => x.state === TaskState.Finished),
                mapTo(
                    isCreateDirectory(item)
                        ? ensureSlash(prefix) + item
                        : getFileUploadTo(item, prefix),
                ),
                take(1),
            ),
        }
    })
    const aggregated$ = combineLatest(
        uploadJobs.map((x, id) =>
            x.state$.pipe(
                stateScan<SubTaskStatus>({
                    id,
                    error: undefined,
                    state: TaskState.Pending,
                    progress: 0,
                }),
            ),
        ),
    ).pipe(share())

    const progress$ = aggregated$.pipe(
        throttleTime(300, asyncScheduler, { trailing: true }),
        map((x) =>
            average(
                x.map((x) =>
                    x.state === TaskState.Finished ||
                    x.state === TaskState.Cancelled ||
                    x.state === TaskState.Error
                        ? 1
                        : x.progress,
                ),
            ),
        ),
    )
    const origin$ = merge(
        progress$.pipe(label("progress")),
        progress$.pipe(mapTo(TaskState.Running), label("state"), take(1)),
        newFileName$.pipe(
            map((x) =>
                x.subTaskID === subTaskID && x.taskID === taskID
                    ? x.name
                    : getUploadItemPath(items[0]),
            ),
            label("path"),
            takeUntil(progress$.pipe(ignoreElements(), endWith(undefined))),
        ),
    ).pipe(endWith({ state: TaskState.Finished }))

    const newPath$ = merge(...uploadJobs.map((x) => x.newPath$))
    return [
        origin$.pipe(
            terminalActions<Partial<TransferTaskStatus>>(
                cancel$.pipe(mapTo({ state: TaskState.Cancelled })),
                error$.pipe(
                    map((error) => ({ state: TaskState.Error, error })),
                ),
            ),
            startWith<Partial<TransferTaskStatus>>({}),

            stateScan<TransferTaskStatus>({
                id: subTaskID,
                isDir:
                    items.length > 1 ||
                    isCreateDirectory(items[0]) ||
                    getFilePath(items[0]).includes("/"),
                path: getUploadItemPath(items[0]),
                state: TaskState.Pending,
                progress: 0,
                error: undefined,
                type: TaskType.UPLOAD,
            }),
        ),
        newPath$,
    ]
}

function average(numbers: number[]): number {
    return numbers.reduce((acc, c) => acc + c, 0) / numbers.length
}

interface UploadProcessorOutput {
    errors$: Observable<string>
    tasks$: Observable<UITaskStatus[]>
    newPaths$: Observable<string>
}

interface UploadedEvent {
    id: string
    error?: any
}

interface InewUploadProcessor {
    getUsage: OfGrpc<Empty, QueryUsedResponse>
    getUploadLinks: OfGrpc<GetUploadLinksRequest, GetUploadLinksResponse>
    getNewName: OfGrpc<PathMessage, PathMessage>
    mkdir: OfGrpc<MkdirRequest, MkdirResponse>
    isExists: OfGrpc<IsExistsRequest, IsExistsResponse>
    request$: Observable<Queued<UploadIntent>>
    remove$: Observable<TaskRef>
    cancel$: Observable<TaskRef>
    conflict: ({ taskRef, filePath }: Conflict) => void
    resolves$: ResolveSubjects
    uploaded$: Observable<UploadedEvent>
    identity$: Observable<string>
    intl: IntlShape
    concurrentLimit?: number
}

export function newUploadProcessor({
    concurrentLimit = 50,
    intl,
    identity$,
    uploaded$,
    getUsage,
    getUploadLinks,
    getNewName,
    mkdir,
    isExists,
    request$,
    remove$,
    cancel$,
    conflict,
    resolves$,
}: InewUploadProcessor): UploadProcessorOutput {
    const strand = newStrand(concurrentLimit)
    const uploadFileInOrder = new UploadFileInOrder({
        getUploadLinks,
        getUsage,
    })

    const tasks = request$.pipe(
        map(({ items, prefix, id }) => {
            uploadFileInOrder.setUploadItemByTaskID(items, id)

            const error$ = new Subject<UploadError | void>()

            const checkFileName$ = checkFileNameConflict({
                isExists,
                items,
                prefix: prefix || "",
                cancel$: merge(
                    cancel$.pipe(
                        filter(isNestedID),
                        filter((x) => x.length <= 1),
                    ),
                    cancel$.pipe(
                        filter(isID),
                        filter((x) => x === id),
                    ),
                ).pipe(mapTo(undefined)),
                error$,
            })
            const subTasks: Array<
                [Observable<TransferTaskStatus>, Observable<string>]
            > = items.map((subTask, subTaskID) => {
                const stopSubTask$ = new Subject<Progress>()
                const cancelBySubTaskID$ = cancel$.pipe(
                    filter(isNestedID),
                    filter((x) => x.length >= 2),
                    filter((x) => x[0] === id),
                    filter((x) => x[1] === subTaskID),
                )
                const cancelByTaskID$ = cancel$.pipe(
                    filter(isID),
                    filter((x) => x === id),
                )
                uploadFileInOrder.handlePrepareCancelTask(
                    merge(cancelBySubTaskID$, cancelByTaskID$),
                )
                return processSubTask({
                    prefix: prefix || "",
                    items: subTask,
                    subTaskID,
                    worker: strand,
                    cancel$: merge(cancelBySubTaskID$, cancelByTaskID$).pipe(
                        mapTo(undefined),
                    ),
                    error$,
                    stopSubTask$,
                    conflict,
                    resolves$,
                    getNewName,
                    mkdir,
                    checkFileName$,
                    uploaded$,
                    identity$,
                    uploadFileInOrder,
                    taskID: id,
                })
            })
            const updates = subTasks.map((x) => x[0].pipe(share()))

            const stateUpdates$ = merge(...updates)

            const progress$ = combineLatest(updates).pipe(
                share(),
                throttleTime(300, asyncScheduler, { trailing: true }),
                map((x) => average(x.map((x) => x.progress))),
            )
            const list$ = combineLatest(updates).pipe(
                throttleTime(300, asyncScheduler, { trailing: true }),
            )

            const running$ = list$.pipe(
                map((x) => {
                    return x.filter((x) => x.state === TaskState.Running).length
                }),
            )
            const finished$ = list$.pipe(
                map(
                    (x) =>
                        x.filter((x) => x.state === TaskState.Finished).length,
                ),
            )

            const count$ = merge(
                running$.pipe(label("running")),
                finished$.pipe(label("finished")),
            ).pipe(stateScan({ running: 0, finished: 0 }))

            const state$ = progress$.pipe(
                mapTo(TaskState.Running),
                endWith(TaskState.Finished),
                terminalActions<TaskState>(
                    cancel$.pipe(
                        filter(isID),
                        filter((x) => x === id),
                        mapTo(TaskState.Cancelled),
                    ),
                ),
            )
            const latestError$ = stateUpdates$.pipe(
                map((x) => x.error),
                filter((x) => x !== undefined),
            )
            return [
                concat(
                    list$.pipe(label("children"), take(1)),
                    merge(
                        list$.pipe(label("children")),
                        progress$.pipe(label("progress")),
                        state$.pipe(label("state")),
                        count$.pipe(label("count")),
                        latestError$.pipe(label("error")),
                    ),
                ).pipe(
                    stateScan<GroupedTaskStatus<TransferTaskStatus>>({
                        id,
                        type: TaskType.UPLOAD,
                        error: undefined,
                        state: TaskState.Pending,
                        progress: 0,
                        children: [],
                        count: { running: 0, finished: 0 },
                        prefix,
                    }),
                    share(),
                    throttleTime(300, asyncScheduler, { trailing: true }),
                ),
                merge(...subTasks.map((x) => x[1])),
            ] as [
                Observable<GroupedTaskStatus<TransferTaskStatus>>,
                Observable<string>,
            ]
        }),
        share(),
    )
    const taskStateUpdate$ = tasks.pipe(mergeMap((task) => task[0]))
    const newPaths$ = tasks.pipe(mergeMap((task) => task[1]))
    return {
        tasks$: taskStateUpdate$.pipe(
            stateStore(remove$.pipe(filter(isID))),
            mapToArray(),
            map((x) =>
                x.map((x) =>
                    x.children.length <= 1 ? flattenToTransferTaskStatus(x) : x,
                ),
            ),
            throttleTime(100, asyncScheduler, { trailing: true }),
        ),
        errors$: taskStateUpdate$.pipe(
            map((x) => x.error),
            filter((x) => x !== undefined),
            map(tryExplainError.bind(null, intl)),
            filter(hasErrorMessage),
        ),
        newPaths$,
    }
}

function startRemoveBackgroundImages(
    selectedFiles: IFile[],
    removeBackgroundImages: (
        request: RemoveBackgroundRequest,
        metadata?: any,
        options?: any,
    ) => Observable<RemoveBackgroundResponse>,
    timestamp: number,
    error$: Subject<{ timestamp: number; statusCode?: number }>,
) {
    return removeBackgroundImages(
        getRemoveBackgroundRequest(selectedFiles, timestamp, true),
        {},
        { httpKey: HttpKey.REMOVE_IMAGE_BACKGROUND },
    ).pipe(
        catchError((e: any) => {
            //when e.code = 4 (timeout error) just ignore error,cause BE still run remove background process
            //otherwhile stop all process
            if (e.code !== StatusCode.DEADLINE_EXCEEDED) {
                error$.next({ timestamp })
                return of({ getStatus: () => undefined })
            }
            return of({ getStatus: () => StatusCode.DEADLINE_EXCEEDED })
        }),
    )
}

type TremoveBackgroundTaskProcess = {
    selectedFiles: IFile[]
    getTaskCount: OfGrpc<GetTaskCountRequest, GetTaskCountResponse>
    timestamp: number
    removeBackgroundImages: OfGrpc<
        RemoveBackgroundRequest,
        RemoveBackgroundResponse
    >
    done$: Subject<{ timestamp: number }>
    error$: Subject<{ timestamp: number; statusCode?: number }>
}

function removeBackgroundTaskProcess({
    selectedFiles,
    getTaskCount,
    timestamp,
    removeBackgroundImages,
    done$,
    error$,
}: TremoveBackgroundTaskProcess): Observable<{
    state?: TaskState
    progress?: number
    error?: boolean
}> {
    return new Observable((obsever) => {
        startRemoveBackgroundImages(
            selectedFiles,
            removeBackgroundImages,
            timestamp,
            done$,
        ).subscribe((res) => {
            const statusCode = res.getStatus()
            if (
                statusCode !== ImageProcessingErrorCode.IMAGE_PROCESSING_OK &&
                statusCode !== StatusCode.DEADLINE_EXCEEDED
            ) {
                obsever.next({
                    state: TaskState.Error,
                    progress: 1,
                    error: true,
                })
                obsever.complete()
                error$.next({ timestamp, statusCode })
                return
            }
            const taskCountRequest = new GetTaskCountRequest()
            taskCountRequest.setTimestamp(timestamp)

            const getCount$ = getTaskCount(taskCountRequest).pipe(
                retry(10),
                map((res) => {
                    const progress = res.getProgress()
                    if (progress === 1) {
                        const errorCode = res.getErrorCode()
                        //If progress=1, it means the task is completed.
                        setTimeout(() => {
                            //Wait until the last status return is executed before running
                            done$.next({ timestamp })
                        }, 0)
                        //return last status
                        return {
                            state:
                                errorCode ===
                                ImageProcessingErrorCode.IMAGE_PROCESSING_PARTIAL_FAILURE_OF_TASKS
                                    ? TaskState.PartialFailure
                                    : TaskState.Finished,
                            progress,
                        }
                    }
                    return {
                        state: TaskState.Running,
                        progress,
                    }
                }),
                distinctUntilKeyChanged("progress"),
                repeat({ count: 600, delay: 2000 }),
                takeUntil(error$),
                takeUntil(done$),
                catchError(() => {
                    return of({
                        progress: 1,
                    })
                }),
            )
            getCount$.pipe(takeUntil(error$)).subscribe(obsever)
        })
    })
}

interface IremoveBackgroundImageProcess {
    selectedFiles: IFile[]
    done$: Subject<{ timestamp: number }>
    error$: Subject<{ timestamp: number; statusCode?: number }>
}

type TremoveBackgroundImagesProcessor = {
    request$: Observable<Queued<IremoveBackgroundImageProcess>>
    getTaskCount: OfGrpc<GetTaskCountRequest, GetTaskCountResponse>
    removeBackgroundImages: OfGrpc<
        RemoveBackgroundRequest,
        RemoveBackgroundResponse
    >
    remove$: Observable<TaskRef>
}

export function initRemoveBackgroundImageProcessor({
    request$,
    getTaskCount,
    remove$,
    removeBackgroundImages,
}: TremoveBackgroundImagesProcessor): Observable<TransferTaskStatus[]> {
    const concurrency = 1
    const initQueueTask$ = new Subject<TransferTaskStatus>()
    const tasks$ = request$.pipe(
        map(({ id, selectedFiles, done$, error$ }) => {
            const sourcePaths = selectedFiles.map((f: IFile) => f.key)
            const timestamp = Math.floor(new Date().getTime() / 1000)
            const targetPaths = getRemoveBackgroundImageSavePaths(
                sourcePaths,
                timestamp,
            )
            const isMultipleFiles = selectedFiles.length >= 2
            const path = isMultipleFiles
                ? `${multipleName(
                      getValidRemoveImageBackgroundFiles(selectedFiles),
                  )}`
                : selectedFiles[0].name

            const initTask = {
                id,
                type: TaskType.REMOVE_IMAGE_BACKGROUND,
                state: TaskState.Pending,
                progress: 0,
                path: path,
                isClickable: !isMultipleFiles,
                removeBackgroundPath: selectedFiles[0].isDir
                    ? removeTrailingSlash(targetPaths[0])
                    : targetPaths[0],
                isDir: selectedFiles[0].isDir,
                error: null,
                prefix: "",
            }
            initQueueTask$.next(initTask)
            const onError$ = error$.pipe(
                map(() => ({
                    state: TaskState.Error,
                    progress: 1,
                    error: true,
                })),
                takeUntil(done$),
                take(1),
            )
            const process$ = removeBackgroundTaskProcess({
                selectedFiles,
                removeBackgroundImages,
                getTaskCount,
                timestamp,
                error$,
                done$,
            })
            return merge(process$, onError$).pipe(
                stateScan<TransferTaskStatus>(initTask),
                filter(
                    ({ error, state }) =>
                        !error || (error && state === TaskState.Error),
                ),
            )
        }),
        mergeMap((task) => task, concurrency),
    )
    return merge(tasks$, initQueueTask$).pipe(stateStore(remove$), mapToArray())
}

function deleteFile({
    fileIds,
    drop,
    completeDropFile$,
    cancel$,
    setDeletingFileIds,
}: {
    cancel$: Observable<TaskRef>
    fileIds: string[]
    drop: (
        request: DropRequest,
        metadata?: any,
        options?: any,
    ) => Observable<Empty>
    completeDropFile$: Subject<string[]>
    setDeletingFileIds: React.Dispatch<React.SetStateAction<string[]>>
}) {
    const totalFiles = fileIds.length

    const chunkArray = (array: string[], chunkSize: number) => {
        const chunks = []
        for (let i = 0; i < array.length; i += chunkSize) {
            chunks.push(array.slice(i, i + chunkSize))
        }
        return chunks
    }
    const fileChunks = chunkArray(fileIds, DELETE_FILE_CHUNK_SIZE)
    let isErrorOccurred = false
    const progress$ = from(fileChunks).pipe(
        concatMap((selectedIds) =>
            forkJoin(
                selectedIds.map((id) => {
                    const request = new DropRequest()
                    request.setId(id)
                    return drop(request).pipe(
                        catchError(() => {
                            isErrorOccurred = true
                            return EMPTY
                        }),
                        takeUntil(cancel$),
                    )
                }),
            ).pipe(
                mergeMap((completedRequests) => {
                    // Delay UI update slightly to prevent layout shift when deleting small files batches
                    const delayDuration =
                        totalFiles < DELETE_FILE_CHUNK_SIZE ? 300 : 0
                    return timer(delayDuration).pipe(
                        map(() => completedRequests.length),
                    )
                }),
                takeUntil(cancel$),
                finalize(() => {
                    if (!isErrorOccurred) {
                        completeDropFile$.next(selectedIds)
                        setDeletingFileIds((prev) =>
                            prev.filter((id) => !selectedIds.includes(id)),
                        )
                    }
                    isErrorOccurred = false
                }),
            ),
        ),
        takeUntil(cancel$),
        scan(
            (totalDeleted, deletedInGroup) => totalDeleted + deletedInGroup,
            0,
        ),
        map((totalDeleted) => totalDeleted / totalFiles),
        throttleTime(100),
        distinctUntilChanged(),
        shareReplay(1),
    )

    const state$ = progress$.pipe(
        map((progress) =>
            progress > 0 ? TaskState.Running : TaskState.Pending,
        ),
        startWith(TaskState.Pending),
        endWith(TaskState.Finished),
        distinctUntilChanged(),
    )

    return merge(state$.pipe(label("state")), progress$.pipe(label("progress")))
}

interface IDeleteProcess {
    fileIds: string[]
    path: string
}
type DeleteProcessor = {
    completeDropFile$: Subject<string[]>
    drop: (
        request: DropRequest,
        metadata?: any,
        options?: any,
    ) => Observable<Empty>
    request$: Observable<{
        id: number
        path: string
        fileIds: string[]
    }>
    setDeletingFileIds: React.Dispatch<React.SetStateAction<string[]>>
    remove$: Observable<TaskRef>
    cancel$: Observable<TaskRef>
}
export function deleteProcessor({
    completeDropFile$,
    request$,
    remove$,
    cancel$,
    setDeletingFileIds,
    drop,
}: DeleteProcessor): Observable<TransferTaskStatus[]> {
    const error$ = new Subject<void>()
    const initQueueTask$ = new Subject<TransferTaskStatus>()
    const task$ = request$.pipe(
        map(({ fileIds, id, path }) => {
            const initTask = {
                id,
                type: TaskType.DELETE,
                state: TaskState.Pending,
                progress: 0,
                isClickable: false,
                error: null,
                path: path,
                isDir: false,
            }
            initQueueTask$.next(initTask)
            const progress$ = deleteFile({
                setDeletingFileIds,
                fileIds,
                drop,
                completeDropFile$,
                cancel$,
            }).pipe(
                terminalActions<Partial<TransferTaskStatus>>(
                    cancel$.pipe(
                        filter(isID),
                        filter((x) => x === id),
                        mapTo({
                            state: TaskState.Cancelled,
                        }),
                    ),
                    error$.pipe(
                        map((error) => ({ state: TaskState.Error, error })),
                    ),
                ),
                stateScan<TransferTaskStatus>(initTask),
                takeWhile((x) => x.state !== TaskState.Cancelled, true),
            )
            return progress$
        }),
        mergeMap((task) => task, 1),
    )
    return merge(task$, initQueueTask$).pipe(stateStore(remove$), mapToArray())
}

export type { IremoveBackgroundImageProcess, IDeleteProcess }
