/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.graphscope.gremlin.plugin.processor;

import com.alibaba.graphscope.common.IrPlan;
import com.alibaba.graphscope.common.client.RpcBroadcastProcessor;
import com.alibaba.graphscope.common.client.RpcChannelFetcher;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.PegasusConfig;
import com.alibaba.graphscope.common.intermediate.InterOpCollection;
import com.alibaba.graphscope.common.manager.IrMetaQueryCallback;
import com.alibaba.graphscope.common.store.IrMeta;
import com.alibaba.graphscope.common.store.IrMetaFetcher;
import com.alibaba.graphscope.gremlin.InterOpCollectionBuilder;
import com.alibaba.graphscope.gremlin.Utils;
import com.alibaba.graphscope.gremlin.plugin.strategy.ExpandFusionStepStrategy;
import com.alibaba.graphscope.gremlin.plugin.strategy.RemoveUselessStepStrategy;
import com.alibaba.graphscope.gremlin.plugin.strategy.ScanFusionStepStrategy;
import com.alibaba.graphscope.gremlin.result.GremlinResultAnalyzer;
import com.alibaba.graphscope.gremlin.result.GremlinResultProcessor;
import com.alibaba.pegasus.intf.ResultProcessor;
import com.alibaba.pegasus.service.protocol.PegasusClient;
import com.codahale.metrics.Timer;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.script.Bindings;
import javax.script.SimpleBindings;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import org.apache.tinkerpop.gremlin.groovy.jsr223.TimedInterruptTimeoutException;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies;
import org.apache.tinkerpop.gremlin.server.Context;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor;
import org.apache.tinkerpop.gremlin.server.op.OpProcessorException;
import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.codehaus.groovy.control.MultipleCompilationErrorsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IrStandardOpProcessor
extends StandardOpProcessor {
    private static Logger metricLogger = LoggerFactory.getLogger((String)"MetricLog");
    private static Logger logger = LoggerFactory.getLogger(IrStandardOpProcessor.class);
    protected static final AtomicLong JOB_ID_COUNTER = new AtomicLong(0L);
    protected Graph graph;
    protected GraphTraversalSource g;
    protected Configs configs;
    protected RpcBroadcastProcessor broadcastProcessor;
    protected IrMetaFetcher irMetaFetcher;
    protected IrMetaQueryCallback metaQueryCallback;

    public IrStandardOpProcessor(Configs configs, IrMetaFetcher irMetaFetcher, RpcChannelFetcher fetcher, IrMetaQueryCallback metaQueryCallback, Graph graph, GraphTraversalSource g) {
        this.graph = graph;
        this.g = g;
        this.configs = configs;
        this.irMetaFetcher = irMetaFetcher;
        this.broadcastProcessor = new RpcBroadcastProcessor(fetcher);
        this.metaQueryCallback = metaQueryCallback;
    }

    protected void evalOpInternal(Context ctx, Supplier<GremlinExecutor> gremlinExecutorSupplier, AbstractEvalOpProcessor.BindingSupplier bindingsSupplier) {
        long startTime = System.currentTimeMillis();
        Timer.Context timerContext = evalOpTimer.time();
        RequestMessage msg = ctx.getRequestMessage();
        GremlinExecutor gremlinExecutor = gremlinExecutorSupplier.get();
        Map args = msg.getArgs();
        String script = (String)args.get("gremlin");
        String language = "antlr-to-java";
        SimpleBindings bindings = new SimpleBindings();
        long jobId = JOB_ID_COUNTER.incrementAndGet();
        GremlinExecutor.LifeCycle lifeCycle = this.createLifeCycle(ctx, gremlinExecutorSupplier, bindingsSupplier, jobId, script);
        try {
            CompletableFuture evalFuture = gremlinExecutor.eval(script, language, (Bindings)bindings, lifeCycle);
            evalFuture.handle((v, t) -> {
                long elapsed = timerContext.stop();
                logger.info("query \"{}\" total execution time is {} ms", (Object)script, (Object)Float.valueOf((float)elapsed / 1000000.0f));
                boolean isSuccess = t == null;
                metricLogger.info("{} | {} | {} | {} | {}", new Object[]{jobId, script, isSuccess, Float.valueOf((float)elapsed / 1000000.0f), startTime});
                if (t != null) {
                    Optional possibleTemporaryException = IrStandardOpProcessor.determineIfTemporaryException((Throwable)t);
                    if (possibleTemporaryException.isPresent()) {
                        ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_TEMPORARY).statusMessage(((Throwable)possibleTemporaryException.get()).getMessage()).statusAttributeException((Throwable)possibleTemporaryException.get()).create());
                    } else if (t instanceof OpProcessorException) {
                        ctx.writeAndFlush(((OpProcessorException)((Object)t)).getResponseMessage());
                    } else if (t instanceof TimedInterruptTimeoutException) {
                        String errorMessage = String.format("A timeout occurred within the script during evaluation of [%s] - consider increasing the limit given to TimedInterruptCustomizerProvider", msg);
                        logger.warn(errorMessage);
                        ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage("Timeout during script evaluation triggered by TimedInterruptCustomizerProvider").statusAttributeException(t).create());
                    } else if (t instanceof TimeoutException) {
                        String errorMessage = String.format("Script evaluation exceeded the configured threshold for request [%s]", msg);
                        logger.warn(errorMessage, t);
                        ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(t.getMessage()).statusAttributeException(t).create());
                    } else if (t instanceof MultipleCompilationErrorsException && t.getMessage().contains("Method too large") && ((MultipleCompilationErrorsException)t).getErrorCollector().getErrorCount() == 1) {
                        String errorMessage = String.format("The Gremlin statement that was submitted exceeds the maximum compilation size allowed by the JVM, please split it into multiple smaller statements - %s", msg);
                        logger.warn(errorMessage);
                        ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_EVALUATION).statusMessage(errorMessage).statusAttributeException(t).create());
                    } else {
                        String errorMessage = t.getMessage() == null ? t.toString() : t.getMessage();
                        logger.warn(String.format("Exception processing a script on request [%s].", msg), t);
                        ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_EVALUATION).statusMessage(errorMessage).statusAttributeException(t).create());
                    }
                }
                return null;
            });
        }
        catch (RejectedExecutionException var17) {
            ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.TOO_MANY_REQUESTS).statusMessage("Rate limiting").create());
        }
    }

    protected GremlinExecutor.LifeCycle createLifeCycle(Context ctx, Supplier<GremlinExecutor> gremlinExecutorSupplier, AbstractEvalOpProcessor.BindingSupplier bindingsSupplier, long jobId, String script) {
        RequestMessage msg = ctx.getRequestMessage();
        Settings settings = ctx.getSettings();
        Map args = msg.getArgs();
        long seto = args.containsKey("evaluationTimeout") ? ((Number)args.get("evaluationTimeout")).longValue() : settings.getEvaluationTimeout();
        return GremlinExecutor.LifeCycle.build().evaluationTimeoutOverride(Long.valueOf(seto)).beforeEval(b -> {
            try {
                b.putAll(bindingsSupplier.get());
                b.put("graph", (Object)this.graph);
                b.put("g", (Object)this.g);
            }
            catch (OpProcessorException ope) {
                throw new RuntimeException(ope);
            }
        }).transformResult(o -> {
            if (o != null && o instanceof Traversal) {
                IrStandardOpProcessor.applyStrategies((Traversal)o);
            }
            return o;
        }).withResult(o -> {
            try {
                if (o != null && o instanceof Traversal) {
                    Traversal traversal = (Traversal)o;
                    this.processTraversal(traversal, new GremlinResultProcessor(ctx, GremlinResultAnalyzer.analyze(traversal)), jobId, script);
                }
            }
            catch (InvalidProtocolBufferException e) {
                throw new RuntimeException(e);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).create();
    }

    protected void processTraversal(Traversal traversal, ResultProcessor resultProcessor, long jobId, String script) throws InvalidProtocolBufferException, IOException, RuntimeException {
        IrMeta irMeta = this.metaQueryCallback.beforeExec();
        InterOpCollection opCollection = new InterOpCollectionBuilder(traversal).build();
        InterOpCollection.applyStrategies(opCollection);
        InterOpCollection.process(opCollection);
        String jobName = "ir_plan_" + jobId;
        IrPlan irPlan = new IrPlan(irMeta, opCollection);
        logger.info("gremlin query \"{}\", job conf name \"{}\", ir plan {}", new Object[]{script, jobName, irPlan.getPlanAsJson()});
        byte[] physicalPlanBytes = irPlan.toPhysicalBytes(this.configs);
        irPlan.close();
        PegasusClient.JobRequest request = PegasusClient.JobRequest.parseFrom((byte[])physicalPlanBytes);
        PegasusClient.JobConfig jobConfig = PegasusClient.JobConfig.newBuilder().setJobId(jobId).setJobName(jobName).setWorkers(PegasusConfig.PEGASUS_WORKER_NUM.get(this.configs).intValue()).setBatchSize(PegasusConfig.PEGASUS_BATCH_SIZE.get(this.configs).intValue()).setMemoryLimit(PegasusConfig.PEGASUS_MEMORY_LIMIT.get(this.configs).intValue()).setBatchCapacity(PegasusConfig.PEGASUS_OUTPUT_CAPACITY.get(this.configs).intValue()).setTimeLimit((long)PegasusConfig.PEGASUS_TIMEOUT.get(this.configs).intValue()).setAll(PegasusClient.Empty.newBuilder().build()).build();
        request = request.toBuilder().setConf(jobConfig).build();
        this.broadcastProcessor.broadcast(request, resultProcessor);
        this.metaQueryCallback.afterExec(irMeta);
    }

    public static void applyStrategies(Traversal traversal) {
        TraversalStrategies traversalStrategies = traversal.asAdmin().getStrategies();
        Set strategies = (Set)Utils.getFieldValue(DefaultTraversalStrategies.class, traversalStrategies, "traversalStrategies");
        strategies.clear();
        strategies.add(ScanFusionStepStrategy.instance());
        strategies.add(RemoveUselessStepStrategy.instance());
        strategies.add(InlineFilterStrategy.instance());
        strategies.add(ExpandFusionStepStrategy.instance());
        traversal.asAdmin().applyStrategies();
    }
}

