import { DebugLogger } from '@affine/debug';
import { ErrorNames, UserFriendlyError } from '@affine/graphql';
import { throwIfAborted } from '@toeverything/infra';
import { base64ToUint8Array, uint8ArrayToBase64 } from '../../utils/base64';
window._TEST_SIMULATE_SYNC_LAG = Promise.resolve();
const logger = new DebugLogger('affine-cloud-doc-engine-server');
export class CloudDocEngineServer {
    constructor(workspaceId, webSocketService){
        this.workspaceId = workspaceId;
        this.interruptCb = null;
        this.SEND_TIMEOUT = 30000;
        this.onInterrupted = (cb)=>{
            this.interruptCb = cb;
        };
        this.handleInterrupted = (reason)=>{
            this.interruptCb?.(reason);
        };
        this.handleDisconnect = (reason)=>{
            this.interruptCb?.(reason);
        };
        this.handleVersionRejected = ()=>{
            this.interruptCb?.('Client version rejected');
        };
        const { socket, dispose } = webSocketService.connect();
        this.socket = socket;
        this.disposeSocket = dispose;
    }
    async clientHandShake() {
        await this.socket.emitWithAck('space:join', {
            spaceType: 'workspace',
            spaceId: this.workspaceId,
            clientVersion: BUILD_CONFIG.appVersion
        });
    }
    async pullDoc(docId, state) {
        await window._TEST_SIMULATE_SYNC_LAG;
        const stateVector = state ? await uint8ArrayToBase64(state) : undefined;
        const response = await this.socket.timeout(this.SEND_TIMEOUT).emitWithAck('space:load-doc', {
            spaceType: 'workspace',
            spaceId: this.workspaceId,
            docId: docId,
            stateVector
        });
        if ('error' in response) {
            const error = new UserFriendlyError(response.error);
            if (error.name === ErrorNames.DOC_NOT_FOUND) {
                return null;
            } else {
                throw error;
            }
        } else {
            return {
                data: base64ToUint8Array(response.data.missing),
                stateVector: response.data.state ? base64ToUint8Array(response.data.state) : undefined,
                serverClock: response.data.timestamp
            };
        }
    }
    async pushDoc(docId, data) {
        const payload = await uint8ArrayToBase64(data);
        const response = await this.socket.timeout(this.SEND_TIMEOUT).emitWithAck('space:push-doc-updates', {
            spaceType: 'workspace',
            spaceId: this.workspaceId,
            docId: docId,
            updates: [
                payload
            ]
        });
        if ('error' in response) {
            logger.error('client-update-v2 error', {
                workspaceId: this.workspaceId,
                guid: docId,
                response
            });
            throw new UserFriendlyError(response.error);
        }
        return {
            serverClock: response.data.timestamp
        };
    }
    async loadServerClock(after) {
        const response = await this.socket.timeout(this.SEND_TIMEOUT).emitWithAck('space:load-doc-timestamps', {
            spaceType: 'workspace',
            spaceId: this.workspaceId,
            timestamp: after
        });
        if ('error' in response) {
            logger.error('client-pre-sync error', {
                workspaceId: this.workspaceId,
                response
            });
            throw new UserFriendlyError(response.error);
        }
        return new Map(Object.entries(response.data));
    }
    async subscribeAllDocs(cb) {
        const handleUpdate = async (message)=>{
            if (message.spaceType === 'workspace' && message.spaceId === this.workspaceId) {
                message.updates.forEach((update)=>{
                    cb({
                        docId: message.docId,
                        data: base64ToUint8Array(update),
                        serverClock: message.timestamp
                    });
                });
            }
        };
        this.socket.on('space:broadcast-doc-updates', handleUpdate);
        return ()=>{
            this.socket.off('space:broadcast-doc-updates', handleUpdate);
        };
    }
    async waitForConnectingServer(signal) {
        this.socket.on('server-version-rejected', this.handleVersionRejected);
        this.socket.on('disconnect', this.handleDisconnect);
        throwIfAborted(signal);
        if (this.socket.connected) {
            await this.clientHandShake();
        } else {
            this.socket.connect();
            await new Promise((resolve, reject)=>{
                this.socket.on('connect', ()=>{
                    resolve();
                });
                signal.addEventListener('abort', ()=>{
                    reject('aborted');
                });
            });
            throwIfAborted(signal);
            await this.clientHandShake();
        }
    }
    disconnectServer() {
        this.socket.emit('space:leave', {
            spaceType: 'workspace',
            spaceId: this.workspaceId
        });
        this.socket.off('server-version-rejected', this.handleVersionRejected);
        this.socket.off('disconnect', this.handleDisconnect);
    }
    dispose() {
        this.disconnectServer();
        this.disposeSocket();
    }
}
