/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.graphscope.gremlin.result.processor;

import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.FrontendConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.exception.FrontendException;
import com.alibaba.graphscope.common.result.ResultParser;
import com.alibaba.graphscope.common.utils.ClassUtils;
import com.alibaba.graphscope.gremlin.plugin.QueryStatusCallback;
import com.alibaba.graphscope.gremlin.result.GroupResultParser;
import com.alibaba.graphscope.proto.frontend.Code;
import com.alibaba.pegasus.common.StreamIterator;
import com.alibaba.pegasus.intf.ResultProcessor;
import com.alibaba.pegasus.service.protocol.PegasusClient;
import com.google.common.collect.Lists;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.List;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.server.Context;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;

public abstract class AbstractResultProcessor
extends StandardOpProcessor
implements ResultProcessor {
    protected final Context writeResult;
    protected final ResultParser resultParser;
    protected final QueryStatusCallback statusCallback;
    protected final QueryTimeoutConfig timeoutConfig;
    protected final List<Object> resultCollectors;
    protected final int resultCollectorsBatchSize;
    protected final StreamIterator<PegasusClient.JobResponse> responseStreamIterator;

    protected AbstractResultProcessor(Configs configs, Context writeResult, ResultParser resultParser, QueryStatusCallback statusCallback, QueryTimeoutConfig timeoutConfig) {
        this.writeResult = writeResult;
        this.resultParser = resultParser;
        this.statusCallback = statusCallback;
        this.timeoutConfig = timeoutConfig;
        RequestMessage msg = writeResult.getRequestMessage();
        Settings settings = writeResult.getSettings();
        this.resultCollectorsBatchSize = msg.optionalArgs("batchSize").orElse(settings.resultIterationBatchSize);
        this.resultCollectors = new ArrayList<Object>(this.resultCollectorsBatchSize);
        this.responseStreamIterator = new StreamIterator(FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs).intValue());
    }

    public synchronized void process(PegasusClient.JobResponse response) {
        try {
            this.responseStreamIterator.putData((Object)response);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized void finish() {
        try {
            this.responseStreamIterator.finish();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized void error(Status status) {
        this.responseStreamIterator.fail((Throwable)status.asException());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void request() {
        try {
            BatchResponseProcessor responseProcessor = new BatchResponseProcessor();
            while (this.responseStreamIterator.hasNext()) {
                responseProcessor.process((PegasusClient.JobResponse)this.responseStreamIterator.next());
            }
            responseProcessor.finish();
            this.statusCallback.getQueryLogger().info("[compile]: process results success", new Object[0]);
        }
        catch (Throwable t) {
            Exception executionException;
            Exception exception = executionException = t != null && t.getCause() instanceof InterruptedException ? new FrontendException(Code.TIMEOUT, ClassUtils.getTimeoutError("Timeout has been detected by gremlin executor", this.timeoutConfig), t) : ClassUtils.handleExecutionException(t, this.timeoutConfig);
            if (executionException instanceof FrontendException) {
                ((FrontendException)executionException).getDetails().put("QueryId", this.statusCallback.getQueryLogger().getQueryId());
            }
            String errorMsg = executionException.getMessage();
            this.statusCallback.onErrorEnd(executionException, errorMsg);
            this.writeResult.writeAndFlush(ResponseMessage.build((RequestMessage)this.writeResult.getRequestMessage()).code(ResponseStatusCode.SERVER_ERROR).statusMessage(errorMsg).create());
        }
        finally {
            if (this.responseStreamIterator != null) {
                this.responseStreamIterator.close();
            }
        }
    }

    protected abstract void aggregateResults();

    private class BatchResponseProcessor {
        private BatchResponseProcessor() {
        }

        public void process(PegasusClient.JobResponse response) {
            if (AbstractResultProcessor.this.resultCollectors.size() >= AbstractResultProcessor.this.resultCollectorsBatchSize && !(AbstractResultProcessor.this.resultParser instanceof GroupResultParser)) {
                AbstractResultProcessor.this.aggregateResults();
                AbstractResultProcessor.this.writeResult.writeAndFlush(ResponseMessage.build((RequestMessage)AbstractResultProcessor.this.writeResult.getRequestMessage()).code(ResponseStatusCode.PARTIAL_CONTENT).result((Object)Lists.newArrayList(AbstractResultProcessor.this.resultCollectors)).create());
                AbstractResultProcessor.this.resultCollectors.clear();
            }
            AbstractResultProcessor.this.resultCollectors.addAll(ClassUtils.callException(() -> AbstractResultProcessor.this.resultParser.parseFrom(response), Code.GREMLIN_INVALID_RESULT));
        }

        public void finish() {
            AbstractResultProcessor.this.statusCallback.onSuccessEnd();
            AbstractResultProcessor.this.aggregateResults();
            AbstractResultProcessor.this.writeResult.writeAndFlush(ResponseMessage.build((RequestMessage)AbstractResultProcessor.this.writeResult.getRequestMessage()).code(ResponseStatusCode.SUCCESS).result(AbstractResultProcessor.this.resultCollectors).create());
        }
    }
}

