/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.graphscope.gremlin.resultx;

import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.FrontendConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.exception.FrontendException;
import com.alibaba.graphscope.common.result.RecordParser;
import com.alibaba.graphscope.common.utils.ClassUtils;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.alibaba.graphscope.gremlin.plugin.QueryStatusCallback;
import com.alibaba.graphscope.gremlin.resultx.ResultSchema;
import com.alibaba.graphscope.proto.frontend.Code;
import com.alibaba.pegasus.common.StreamIterator;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.server.Context;

public class GremlinResultProcessor
implements ExecutionResponseListener {
    protected final Context ctx;
    protected final QueryStatusCallback statusCallback;
    protected final RecordParser<Object> recordParser;
    protected final ResultSchema resultSchema;
    protected final Map<Object, Object> reducer;
    protected final StreamIterator<IrResult.Record> recordStreamIterator;
    protected final QueryTimeoutConfig timeoutConfig;

    public GremlinResultProcessor(Configs configs, Context ctx, RecordParser recordParser, ResultSchema resultSchema, QueryStatusCallback statusCallback, QueryTimeoutConfig timeoutConfig) {
        this.ctx = ctx;
        this.recordParser = recordParser;
        this.resultSchema = resultSchema;
        this.statusCallback = statusCallback;
        this.timeoutConfig = timeoutConfig;
        this.reducer = Maps.newLinkedHashMap();
        this.recordStreamIterator = new StreamIterator(FrontendConfig.PER_QUERY_STREAM_BUFFER_MAX_CAPACITY.get(configs).intValue());
    }

    @Override
    public void onNext(IrResult.Record record) {
        try {
            this.recordStreamIterator.putData((Object)record);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void onCompleted() {
        try {
            this.recordStreamIterator.finish();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void onError(Throwable t) {
        this.recordStreamIterator.fail(t);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void request() {
        try {
            while (this.recordStreamIterator.hasNext()) {
                this.processRecord((IrResult.Record)this.recordStreamIterator.next());
            }
            this.finishRecord();
        }
        catch (Throwable t) {
            Exception executionException;
            Exception exception = executionException = t != null && t.getCause() instanceof InterruptedException ? new FrontendException(Code.TIMEOUT, ClassUtils.getTimeoutError("Timeout has been detected by gremlin executor", this.timeoutConfig), t) : ClassUtils.handleExecutionException(t, this.timeoutConfig);
            if (executionException instanceof FrontendException) {
                ((FrontendException)executionException).getDetails().put("QueryId", this.statusCallback.getQueryLogger().getQueryId());
            }
            String errorMsg = executionException.getMessage();
            this.statusCallback.onErrorEnd(executionException, errorMsg);
            this.ctx.writeAndFlush(ResponseMessage.build((RequestMessage)this.ctx.getRequestMessage()).code(ResponseStatusCode.SERVER_ERROR).statusMessage(errorMsg).create());
        }
        finally {
            if (this.recordStreamIterator != null) {
                this.recordStreamIterator.close();
            }
        }
    }

    protected void processRecord(IrResult.Record record) {
        List results = ClassUtils.callException(() -> this.recordParser.parseFrom(record), Code.GREMLIN_INVALID_RESULT);
        if (this.resultSchema.isGroupBy && !results.isEmpty()) {
            if (results.stream().anyMatch(k -> !(k instanceof Map))) {
                throw new IllegalArgumentException("cannot reduce results " + results + " into a single map");
            }
            for (Object result : results) {
                this.reducer.putAll((Map)result);
            }
        } else if (!this.resultSchema.isGroupBy) {
            this.ctx.writeAndFlush(ResponseMessage.build((RequestMessage)this.ctx.getRequestMessage()).code(ResponseStatusCode.PARTIAL_CONTENT).result((Object)results).create());
        }
    }

    protected void finishRecord() {
        this.statusCallback.onSuccessEnd();
        ArrayList results = Lists.newArrayList();
        if (this.resultSchema.isGroupBy) {
            results.add(this.reducer);
        }
        this.ctx.writeAndFlush(ResponseMessage.build((RequestMessage)this.ctx.getRequestMessage()).code(ResponseStatusCode.SUCCESS).result((Object)results).create());
    }
}

