import { openDB } from 'idb';
import { merge, Observable, of, throttleTime } from 'rxjs';
import { fromPromise } from '../../../../livedata';
import { throwIfAborted } from '../../../../utils';
import { exhaustMapWithTrailing } from '../../../../utils/';
export class IndexedDBJobQueue {
    constructor(databaseName = 'jobs'){
        this.databaseName = databaseName;
        this.database = null;
        this.broadcast = new BroadcastChannel('idb-job-queue:' + this.databaseName);
        this.TIMEOUT = 1000 * 30;
    }
    async enqueue(jobs) {
        await this.ensureInitialized();
        const trx = this.database.transaction([
            'jobs'
        ], 'readwrite');
        for (const job of jobs){
            await trx.objectStore('jobs').add({
                batchKey: job.batchKey,
                payload: job.payload,
                startTime: null
            });
        }
        trx.commit();
        this.broadcast.postMessage('new-jobs');
    }
    async accept() {
        await this.ensureInitialized();
        const jobs = [];
        const trx = this.database.transaction([
            'jobs'
        ], 'readwrite');
        if (jobs.length === 0) {
            const batchKeys = trx.objectStore('jobs').index('batchKey').iterate();
            let currentBatchKey = null;
            let currentBatchJobs = [];
            let skipCurrentBatch = false;
            for await (const item of batchKeys){
                if (item.value.batchKey !== currentBatchKey) {
                    if (!skipCurrentBatch && currentBatchJobs.length > 0) {
                        break;
                    }
                    currentBatchKey = item.value.batchKey;
                    currentBatchJobs = [];
                    skipCurrentBatch = false;
                }
                if (skipCurrentBatch) {
                    continue;
                }
                if (this.isAcceptable(item.value)) {
                    currentBatchJobs.push({
                        id: item.primaryKey,
                        job: item.value
                    });
                } else {
                    skipCurrentBatch = true;
                }
            }
            if (skipCurrentBatch === false && currentBatchJobs.length > 0) {
                jobs.push(...currentBatchJobs);
            }
        }
        for (const { id, job } of jobs){
            const startTime = Date.now();
            await trx.objectStore('jobs').put({
                ...job,
                startTime
            }, id);
        }
        if (jobs.length === 0) {
            return null;
        }
        return jobs.map(({ id, job })=>({
                id: id.toString(),
                batchKey: job.batchKey,
                payload: job.payload
            }));
    }
    async waitForAccept(signal) {
        const broadcast = new BroadcastChannel('idb-job-queue:' + this.databaseName);
        try {
            let deferred = Promise.withResolvers();
            broadcast.onmessage = ()=>{
                deferred.resolve();
            };
            while(throwIfAborted(signal)){
                const jobs = await this.accept();
                if (jobs !== null) {
                    return jobs;
                }
                await Promise.race([
                    deferred.promise,
                    new Promise((resolve)=>{
                        setTimeout(resolve, 5000);
                    }),
                    new Promise((_, reject)=>{
                        if (signal?.aborted) {
                            reject(signal.reason);
                        }
                        signal?.addEventListener('abort', ()=>{
                            reject(signal.reason);
                        });
                    })
                ]);
                deferred = Promise.withResolvers();
            }
            return [];
        } finally{
            broadcast.close();
        }
    }
    async complete(jobs) {
        await this.ensureInitialized();
        const trx = this.database.transaction([
            'jobs'
        ], 'readwrite');
        for (const { id } of jobs){
            await trx.objectStore('jobs').delete(typeof id === 'string' ? parseInt(id) : id);
        }
        trx.commit();
        this.broadcast.postMessage('job-completed');
    }
    async return(jobs, retry = false) {
        await this.ensureInitialized();
        const trx = this.database.transaction([
            'jobs'
        ], 'readwrite');
        for (const { id } of jobs){
            if (retry) {
                const nid = typeof id === 'string' ? parseInt(id) : id;
                const job = await trx.objectStore('jobs').get(nid);
                if (job) {
                    await trx.objectStore('jobs').put({
                        ...job,
                        startTime: null
                    }, nid);
                }
            } else {
                await trx.objectStore('jobs').delete(typeof id === 'string' ? parseInt(id) : id);
            }
        }
        trx.commit();
        this.broadcast.postMessage('job-completed');
    }
    async clear() {
        await this.ensureInitialized();
        const trx = this.database.transaction([
            'jobs'
        ], 'readwrite');
        await trx.objectStore('jobs').clear();
    }
    async ensureInitialized() {
        if (!this.database) {
            await this.initialize();
        }
    }
    async initialize() {
        if (this.database) {
            return;
        }
        this.database = await openDB(this.databaseName, 1, {
            upgrade (database) {
                const jobs = database.createObjectStore('jobs', {
                    autoIncrement: true
                });
                jobs.createIndex('batchKey', 'batchKey');
            }
        });
    }
    isTimeout(job) {
        return job.startTime !== null && job.startTime + this.TIMEOUT < Date.now();
    }
    isAcceptable(job) {
        return job.startTime === null || this.isTimeout(job);
    }
    get status$() {
        return merge(of(1), new Observable((subscriber)=>{
            const broadcast = new BroadcastChannel('idb-job-queue:' + this.databaseName);
            broadcast.onmessage = ()=>{
                subscriber.next(1);
            };
            return ()=>{
                broadcast.close();
            };
        })).pipe(throttleTime(300, undefined, {
            leading: true,
            trailing: true
        }), exhaustMapWithTrailing(()=>fromPromise(async ()=>{
                await this.ensureInitialized();
                const trx = this.database.transaction([
                    'jobs'
                ], 'readonly');
                const remaining = await trx.objectStore('jobs').count();
                return {
                    remaining
                };
            })));
    }
}
