/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.graphscope.gremlin.plugin.processor;

import com.alibaba.graphscope.common.IrPlan;
import com.alibaba.graphscope.common.client.ExecutionClient;
import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.FrontendConfig;
import com.alibaba.graphscope.common.config.PegasusConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.exception.FrontendException;
import com.alibaba.graphscope.common.intermediate.InterOpCollection;
import com.alibaba.graphscope.common.ir.meta.IrMeta;
import com.alibaba.graphscope.common.ir.tools.GraphPlanner;
import com.alibaba.graphscope.common.ir.tools.QueryCache;
import com.alibaba.graphscope.common.ir.tools.QueryIdGenerator;
import com.alibaba.graphscope.common.manager.IrMetaQueryCallback;
import com.alibaba.graphscope.common.utils.ClassUtils;
import com.alibaba.graphscope.gremlin.InterOpCollectionBuilder;
import com.alibaba.graphscope.gremlin.Utils;
import com.alibaba.graphscope.gremlin.plugin.MetricsCollector;
import com.alibaba.graphscope.gremlin.plugin.QueryLogger;
import com.alibaba.graphscope.gremlin.plugin.QueryStatusCallback;
import com.alibaba.graphscope.gremlin.plugin.processor.LifeCycleSupplier;
import com.alibaba.graphscope.gremlin.plugin.strategy.ExpandFusionStepStrategy;
import com.alibaba.graphscope.gremlin.plugin.strategy.RemoveUselessStepStrategy;
import com.alibaba.graphscope.gremlin.plugin.strategy.ScanFusionStepStrategy;
import com.alibaba.graphscope.gremlin.plugin.traversal.IrCustomizedTraversal;
import com.alibaba.graphscope.gremlin.plugin.traversal.IrCustomizedTraversalSource;
import com.alibaba.graphscope.gremlin.result.processor.AbstractResultProcessor;
import com.alibaba.graphscope.gremlin.result.processor.GremlinResultProcessor;
import com.alibaba.graphscope.proto.frontend.Code;
import com.alibaba.pegasus.RpcClient;
import com.alibaba.pegasus.intf.ResultProcessor;
import com.alibaba.pegasus.service.protocol.PegasusClient;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ImplicitContextKeyed;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.trace.IdGenerator;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.script.Bindings;
import javax.script.SimpleBindings;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import org.apache.tinkerpop.gremlin.groovy.jsr223.TimedInterruptTimeoutException;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies;
import org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor;
import org.apache.tinkerpop.gremlin.server.op.OpProcessorException;
import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.codehaus.groovy.control.MultipleCompilationErrorsException;

public class IrStandardOpProcessor
extends StandardOpProcessor {
    protected final Graph graph;
    protected final GraphTraversalSource g;
    protected final Configs configs;
    protected final RpcClient rpcClient;
    protected final IrMetaQueryCallback metaQueryCallback;
    protected final QueryIdGenerator idGenerator;
    protected final QueryCache queryCache;
    protected final GraphPlanner graphPlanner;
    protected final ExecutionClient executionClient;
    protected Tracer tracer;
    protected LongHistogram queryHistogram;
    protected long printThreshold;
    protected IdGenerator opentelemetryIdGenerator;

    public IrStandardOpProcessor(Configs configs, QueryIdGenerator idGenerator, QueryCache queryCache, GraphPlanner graphPlanner, ExecutionClient executionClient, ChannelFetcher fetcher, IrMetaQueryCallback metaQueryCallback, Graph graph, GraphTraversalSource g) {
        this.graph = graph;
        this.g = g;
        this.configs = configs;
        this.rpcClient = FrontendConfig.ENGINE_TYPE.get(this.configs).equals("pegasus") ? new RpcClient(fetcher.fetch()) : null;
        this.metaQueryCallback = metaQueryCallback;
        this.idGenerator = idGenerator;
        this.queryCache = queryCache;
        this.graphPlanner = graphPlanner;
        this.executionClient = executionClient;
        this.printThreshold = FrontendConfig.QUERY_PRINT_THRESHOLD_MS.get(configs);
        this.opentelemetryIdGenerator = IdGenerator.random();
        this.initTracer();
        this.initMetrics();
    }

    protected void evalOpInternal(org.apache.tinkerpop.gremlin.server.Context ctx, Supplier<GremlinExecutor> gremlinExecutorSupplier, AbstractEvalOpProcessor.BindingSupplier bindingsSupplier) {
        GremlinExecutor.LifeCycle lifeCycle;
        RequestMessage msg = ctx.getRequestMessage();
        GremlinExecutor gremlinExecutor = gremlinExecutorSupplier.get();
        Map args = msg.getArgs();
        String script = (String)args.get("gremlin");
        Map bindings = args.get("bindings") == null ? null : (Map)args.get("bindings");
        String upTraceId = bindings == null || bindings.get("X-Trace-ID") == null ? null : String.valueOf(bindings.get("X-Trace-ID"));
        String defaultValidateQuery = "''";
        if (script.equals(defaultValidateQuery)) {
            ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SUCCESS).create());
            return;
        }
        BigInteger jobId = this.idGenerator.generateId();
        String jobName = this.idGenerator.generateName(jobId);
        String language = FrontendConfig.GREMLIN_SCRIPT_LANGUAGE_NAME.get(this.configs);
        IrMeta irMeta = ClassUtils.callExceptionWithDetails(() -> this.metaQueryCallback.beforeExec(), Code.META_SCHEMA_NOT_READY, Map.of("QueryId", jobId));
        if (irMeta.getSchema().getVertexList().isEmpty() && irMeta.getSchema().getEdgeList().isEmpty()) {
            language = "antlr_gremlin_traversal";
        }
        QueryStatusCallback statusCallback = ClassUtils.createQueryStatusCallback(jobId, upTraceId, script, new MetricsCollector.Gremlin(evalOpTimer), this.queryHistogram, this.configs);
        statusCallback.getQueryLogger().info("[compile]: query received", new Object[0]);
        QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout());
        switch (language) {
            case "antlr_gremlin_traversal": {
                lifeCycle = this.createLifeCycle(ctx, gremlinExecutorSupplier, bindingsSupplier, irMeta, statusCallback, timeoutConfig);
                break;
            }
            case "antlr_gremlin_calcite": {
                lifeCycle = new LifeCycleSupplier(this.configs, ctx, this.queryCache, this.graphPlanner, this.executionClient, jobId, jobName, irMeta, statusCallback, timeoutConfig).get();
                break;
            }
            default: {
                throw new IllegalArgumentException("invalid script language name: " + language);
            }
        }
        try {
            CompletableFuture evalFuture = gremlinExecutor.eval(script, language, (Bindings)new SimpleBindings(), lifeCycle);
            evalFuture.handle((v, t) -> {
                this.metaQueryCallback.afterExec(irMeta);
                if (t instanceof FrontendException) {
                    ((FrontendException)t).getDetails().put("QueryId", jobId);
                }
                if (t != null && !(t instanceof TimeoutException)) {
                    statusCallback.onErrorEnd(t.getMessage());
                    Optional possibleTemporaryException = IrStandardOpProcessor.determineIfTemporaryException((Throwable)t);
                    if (possibleTemporaryException.isPresent()) {
                        ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_TEMPORARY).statusMessage(((Throwable)possibleTemporaryException.get()).getMessage()).statusAttributeException((Throwable)possibleTemporaryException.get()).create());
                    } else if (t instanceof OpProcessorException) {
                        ctx.writeAndFlush(((OpProcessorException)((Object)t)).getResponseMessage());
                    } else if (t instanceof TimedInterruptTimeoutException) {
                        String errorMessage = String.format("A timeout occurred within the script during evaluation of [%s] - consider increasing the limit given to TimedInterruptCustomizerProvider", msg);
                        statusCallback.getQueryLogger().warn(errorMessage, new Object[0]);
                        ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage("Timeout during script evaluation triggered by TimedInterruptCustomizerProvider").statusAttributeException(t).create());
                    } else if (t instanceof MultipleCompilationErrorsException && t.getMessage().contains("Method too large") && ((MultipleCompilationErrorsException)t).getErrorCollector().getErrorCount() == 1) {
                        String errorMessage = String.format("The Gremlin statement that was submitted exceeds the maximum compilation size allowed by the JVM, please split it into multiple smaller statements - %s", msg);
                        statusCallback.getQueryLogger().warn(errorMessage, new Object[0]);
                        ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_EVALUATION).statusMessage(errorMessage).statusAttributeException(t).create());
                    } else {
                        String errorMessage = t.getMessage() == null ? t.toString() : t.getMessage();
                        statusCallback.getQueryLogger().warn(String.format("Exception processing a script on request [%s].", msg), t);
                        ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR_EVALUATION).statusMessage(errorMessage).statusAttributeException(t).create());
                    }
                }
                return null;
            });
        }
        catch (RejectedExecutionException var17) {
            statusCallback.getQueryLogger().error(var17);
            ctx.writeAndFlush(ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.TOO_MANY_REQUESTS).statusMessage(var17.getMessage()).create());
        }
    }

    protected GremlinExecutor.LifeCycle createLifeCycle(org.apache.tinkerpop.gremlin.server.Context ctx, Supplier<GremlinExecutor> gremlinExecutorSupplier, AbstractEvalOpProcessor.BindingSupplier bindingsSupplier, IrMeta irMeta, QueryStatusCallback statusCallback, QueryTimeoutConfig timeoutConfig) {
        return GremlinExecutor.LifeCycle.build().evaluationTimeoutOverride(Long.valueOf(timeoutConfig.getExecutionTimeoutMS())).beforeEval(b -> {
            try {
                b.putAll(bindingsSupplier.get());
                b.put("graph", (Object)this.graph);
                b.put("g", (Object)this.g);
            }
            catch (OpProcessorException ope) {
                throw new RuntimeException(ope);
            }
        }).transformResult(o -> {
            if (o != null && o instanceof Traversal) {
                IrStandardOpProcessor.applyStrategies((Traversal)o);
            }
            statusCallback.getQueryLogger().info("[compile]: traversal compiled", new Object[0]);
            return o;
        }).withResult(o -> {
            if (o != null && o instanceof Traversal) {
                Traversal traversal = (Traversal)o;
                this.processTraversal(traversal, new GremlinResultProcessor(this.configs, ctx, traversal, statusCallback, timeoutConfig), irMeta, timeoutConfig, statusCallback.getQueryLogger());
            }
        }).create();
    }

    protected void processTraversal(Traversal traversal, AbstractResultProcessor resultProcessor, IrMeta irMeta, QueryTimeoutConfig timeoutConfig, QueryLogger queryLogger) {
        Span outgoing;
        Configs queryConfigs = this.getQueryConfigs(traversal);
        InterOpCollection logicalPlan = ClassUtils.callException(() -> {
            InterOpCollection opCollection = new InterOpCollectionBuilder(traversal).build();
            InterOpCollection.applyStrategies(opCollection);
            InterOpCollection.process(opCollection);
            return opCollection;
        }, Code.LOGICAL_PLAN_BUILD_FAILED);
        queryLogger.info("[compile]: logical IR compiled", new Object[0]);
        StringBuilder irPlanStr = new StringBuilder();
        PegasusClient.JobRequest physicalRequest = ClassUtils.callException(() -> {
            IrPlan irPlan = new IrPlan(irMeta, logicalPlan);
            queryLogger.info("Submitted query", new Object[0]);
            irPlanStr.append(irPlan.getPlanAsJson());
            queryLogger.debug("ir plan {}", irPlanStr.toString());
            queryLogger.setIrPlan(irPlanStr.toString());
            byte[] physicalPlanBytes = irPlan.toPhysicalBytes(queryConfigs);
            irPlan.close();
            BigInteger jobId = queryLogger.getQueryId();
            PegasusClient.JobRequest request = PegasusClient.JobRequest.newBuilder().setPlan(ByteString.copyFrom((byte[])physicalPlanBytes)).build();
            String jobName = "ir_plan_" + jobId;
            PegasusClient.JobConfig jobConfig = PegasusClient.JobConfig.newBuilder().setJobId(jobId.longValue()).setJobName(jobName).setWorkers(PegasusConfig.PEGASUS_WORKER_NUM.get(queryConfigs).intValue()).setBatchSize(PegasusConfig.PEGASUS_BATCH_SIZE.get(queryConfigs).intValue()).setMemoryLimit(PegasusConfig.PEGASUS_MEMORY_LIMIT.get(queryConfigs).intValue()).setBatchCapacity(PegasusConfig.PEGASUS_OUTPUT_CAPACITY.get(queryConfigs).intValue()).setTimeLimit(timeoutConfig.getEngineTimeoutMS()).setAll(PegasusClient.Empty.newBuilder().build()).build();
            request = request.toBuilder().setConf(jobConfig).build();
            return request;
        }, Code.PHYSICAL_PLAN_BUILD_FAILED);
        queryLogger.info("[compile]: physical IR compiled", new Object[0]);
        if (TraceId.isValid((CharSequence)queryLogger.getUpstreamId())) {
            SpanContext spanContext = SpanContext.createFromRemoteParent((String)queryLogger.getUpstreamId(), (String)this.opentelemetryIdGenerator.generateSpanId(), (TraceFlags)TraceFlags.getDefault(), (TraceState)TraceState.getDefault());
            outgoing = this.tracer.spanBuilder("frontend/query").setParent(Context.current().with((ImplicitContextKeyed)Span.wrap((SpanContext)spanContext))).setSpanKind(SpanKind.CLIENT).startSpan();
        } else {
            outgoing = this.tracer.spanBuilder("frontend/query").setSpanKind(SpanKind.CLIENT).startSpan();
        }
        try (Scope ignored = outgoing.makeCurrent();){
            outgoing.setAttribute("query.id", queryLogger.getQueryId().toString());
            outgoing.setAttribute("query.statement", queryLogger.getQuery());
            outgoing.setAttribute("query.plan", irPlanStr.toString());
            this.rpcClient.submit(physicalRequest, (ResultProcessor)resultProcessor, timeoutConfig.getChannelTimeoutMS());
            resultProcessor.request();
        }
        catch (Throwable t) {
            outgoing.setStatus(StatusCode.ERROR, "Submit failed!");
            outgoing.recordException(t);
            throw t;
        }
        finally {
            outgoing.end();
        }
    }

    private Configs getQueryConfigs(Traversal traversal) {
        Optional sourceOpt;
        Iterator<Object> keyIterator = this.configs.getKeys();
        HashMap configMap = Maps.newHashMap();
        while (keyIterator.hasNext()) {
            String key = keyIterator.next().toString();
            configMap.put(key, this.configs.get(key));
        }
        if (traversal instanceof IrCustomizedTraversal && (sourceOpt = ((IrCustomizedTraversal)traversal).getTraversalSource()).isPresent()) {
            configMap.putAll(((IrCustomizedTraversalSource)((Object)sourceOpt.get())).getConfigs());
        }
        return new Configs(configMap);
    }

    public static void applyStrategies(Traversal traversal) {
        TraversalStrategies traversalStrategies = traversal.asAdmin().getStrategies();
        Set strategies = (Set)Utils.getFieldValue(DefaultTraversalStrategies.class, traversalStrategies, "traversalStrategies");
        strategies.clear();
        strategies.add(ScanFusionStepStrategy.instance());
        strategies.add(RemoveUselessStepStrategy.instance());
        strategies.add(InlineFilterStrategy.instance());
        strategies.add(ExpandFusionStepStrategy.instance());
        traversal.asAdmin().applyStrategies();
    }

    public void close() throws Exception {
        if (this.rpcClient != null) {
            this.rpcClient.shutdown();
        }
    }

    public void initTracer() {
        this.tracer = GlobalOpenTelemetry.getTracer((String)"default");
    }

    public void initMetrics() {
        Meter meter = GlobalOpenTelemetry.getMeter((String)"default");
        this.queryHistogram = meter.histogramBuilder("groot.frontend.query.duration").setDescription("Duration of gremlin queries.").setUnit("ms").ofLongs().build();
    }
}

