import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import { difference } from 'lodash-es';
import { LiveData } from '../../livedata';
import { MANUALLY_STOP } from '../../utils';
import { BlobStorageOverCapacity } from './error';
const logger = new DebugLogger('affine:blob-engine');
export class BlobEngine {
    constructor(local, remotes){
        this.local = local;
        this.remotes = remotes;
        this.name = 'blob-engine';
        this.readonly = this.local.readonly;
        this.abort = null;
        this.isStorageOverCapacity$ = new LiveData(false);
        this.singleBlobSizeLimit = 100 * 1024 * 1024;
        this.onAbortLargeBlob = new Slot();
    }
    start() {
        if (this.abort || this.isStorageOverCapacity$.value) {
            return;
        }
        this.abort = new AbortController();
        const abortSignal = this.abort.signal;
        const sync = ()=>{
            if (abortSignal.aborted) {
                return;
            }
            this.sync().catch((error)=>{
                logger.error('sync blob error', error);
            }).finally(()=>{
                setTimeout(sync, 60000);
            });
        };
        sync();
    }
    stop() {
        this.abort?.abort(MANUALLY_STOP);
        this.abort = null;
    }
    get storages() {
        return [
            this.local,
            ...this.remotes
        ];
    }
    async sync() {
        if (this.local.readonly) {
            return;
        }
        logger.debug('start syncing blob...');
        for (const remote of this.remotes){
            let localList = [];
            let remoteList = [];
            if (!remote.readonly) {
                try {
                    localList = await this.local.list();
                    remoteList = await remote.list();
                } catch (err) {
                    logger.error(`error when sync`, err);
                    continue;
                }
                const needUpload = difference(localList, remoteList);
                for (const key of needUpload){
                    try {
                        const data = await this.local.get(key);
                        if (data) {
                            await remote.set(key, data);
                        }
                    } catch (err) {
                        logger.error(`error when sync ${key} from [${this.local.name}] to [${remote.name}]`, err);
                    }
                }
            }
            const needDownload = difference(remoteList, localList);
            for (const key of needDownload){
                try {
                    const data = await remote.get(key);
                    if (data) {
                        await this.local.set(key, data);
                    }
                } catch (err) {
                    if (err instanceof BlobStorageOverCapacity) {
                        this.isStorageOverCapacity$.value = true;
                    }
                    logger.error(`error when sync ${key} from [${remote.name}] to [${this.local.name}]`, err);
                }
            }
        }
        logger.debug('finish syncing blob');
    }
    async get(key) {
        logger.debug('get blob', key);
        for (const storage of this.storages){
            const data = await storage.get(key);
            if (data) {
                return data;
            }
        }
        return null;
    }
    async set(key, value) {
        if (this.local.readonly) {
            throw new Error('local peer is readonly');
        }
        if (value.size > this.singleBlobSizeLimit) {
            this.onAbortLargeBlob.emit(value);
            logger.error('blob over limit, abort set');
            return key;
        }
        await this.local.set(key, value);
        Promise.allSettled(this.remotes.filter((r)=>!r.readonly).map((peer)=>peer.set(key, value).catch((err)=>{
                logger.error('Error when uploading to peer', err);
            }))).then((result)=>{
            if (result.some(({ status })=>status === 'rejected')) {
                logger.error(`blob ${key} update finish, but some peers failed to update`);
            } else {
                logger.debug(`blob ${key} update finish`);
            }
        }).catch(()=>{});
        return key;
    }
    async delete(_key) {}
    async list() {
        const blobList = new Set();
        for (const peer of this.storages){
            const list = await peer.list();
            if (list) {
                for (const blob of list){
                    blobList.add(blob);
                }
            }
        }
        return Array.from(blobList);
    }
}
export const EmptyBlobStorage = {
    name: 'empty',
    readonly: true,
    async get (_key) {
        return null;
    },
    async set (_key, _value) {
        throw new Error('not supported');
    },
    async delete (_key) {
        throw new Error('not supported');
    },
    async list () {
        return [];
    }
};
export class MemoryBlobStorage {
    constructor(state){
        this.state = state;
        this.name = 'testing';
        this.readonly = false;
    }
    get(key) {
        return Promise.resolve(this.state.get(key) ?? null);
    }
    set(key, value) {
        this.state.set(key, value);
        const list = this.state.get('list') ?? new Set();
        list.add(key);
        this.state.set('list', list);
        return Promise.resolve(key);
    }
    delete(key) {
        this.state.set(key, null);
        const list = this.state.get('list') ?? new Set();
        list.delete(key);
        this.state.set('list', list);
        return Promise.resolve();
    }
    list() {
        const list = this.state.get('list');
        return Promise.resolve(list ? Array.from(list) : []);
    }
}
