/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.graphscope.gremlin.result;

import com.alibaba.graphscope.common.client.ResultParser;
import com.alibaba.graphscope.gremlin.result.GroupResultParser;
import com.alibaba.pegasus.intf.ResultProcessor;
import com.alibaba.pegasus.service.protocol.PegasusClient;
import io.grpc.Status;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.server.Context;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.handler.Frame;
import org.apache.tinkerpop.gremlin.server.handler.StateKey;
import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GremlinResultProcessor
extends StandardOpProcessor
implements ResultProcessor {
    private static Logger logger = LoggerFactory.getLogger(GremlinResultProcessor.class);
    protected Context writeResult;
    protected List<Object> resultCollectors;
    protected int resultCollectorsBatchSize;
    protected boolean locked = false;
    protected ResultParser resultParser;

    public GremlinResultProcessor(Context writeResult, ResultParser resultParser) {
        this.writeResult = writeResult;
        this.resultParser = resultParser;
        RequestMessage msg = writeResult.getRequestMessage();
        Settings settings = writeResult.getSettings();
        this.resultCollectorsBatchSize = msg.optionalArgs("batchSize").orElse(settings.resultIterationBatchSize);
        this.resultCollectors = new ArrayList<Object>(this.resultCollectorsBatchSize);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(PegasusClient.JobResponse response) {
        GremlinResultProcessor gremlinResultProcessor = this;
        synchronized (gremlinResultProcessor) {
            try {
                if (!this.locked) {
                    if (this.resultCollectors.size() >= this.resultCollectorsBatchSize) {
                        this.formatResultIfNeed();
                        this.writeResultList(this.writeResult, this.resultCollectors, ResponseStatusCode.PARTIAL_CONTENT);
                        this.resultCollectors = new ArrayList<Object>(this.resultCollectorsBatchSize);
                    }
                    this.resultCollectors.addAll(this.resultParser.parseFrom(response));
                }
            }
            catch (Exception e) {
                this.writeResultList(this.writeResult, Collections.singletonList(e.getMessage()), ResponseStatusCode.SERVER_ERROR);
                this.locked = true;
                throw new RuntimeException(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void finish() {
        GremlinResultProcessor gremlinResultProcessor = this;
        synchronized (gremlinResultProcessor) {
            if (!this.locked) {
                this.formatResultIfNeed();
                this.writeResultList(this.writeResult, this.resultCollectors, ResponseStatusCode.SUCCESS);
                this.locked = true;
            }
        }
    }

    protected void formatResultIfNeed() {
        if (this.resultParser instanceof GroupResultParser) {
            LinkedHashMap groupResult = new LinkedHashMap();
            this.resultCollectors.forEach(k -> groupResult.putAll((Map)k));
            this.resultCollectors.clear();
            this.resultCollectors.add(groupResult);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void error(Status status) {
        GremlinResultProcessor gremlinResultProcessor = this;
        synchronized (gremlinResultProcessor) {
            if (!this.locked) {
                this.writeResultList(this.writeResult, Collections.singletonList(status.toString()), ResponseStatusCode.SERVER_ERROR);
                this.locked = true;
            }
        }
    }

    protected void writeResultList(Context context, List<Object> resultList, ResponseStatusCode statusCode) {
        ChannelHandlerContext ctx = context.getChannelHandlerContext();
        RequestMessage msg = context.getRequestMessage();
        MessageSerializer serializer = (MessageSerializer)ctx.channel().attr(StateKey.SERIALIZER).get();
        boolean useBinary = (Boolean)ctx.channel().attr(StateKey.USE_BINARY).get();
        if (statusCode == ResponseStatusCode.SERVER_ERROR) {
            ResponseMessage.Builder builder = ResponseMessage.build((RequestMessage)msg).code(ResponseStatusCode.SERVER_ERROR);
            if (resultList.size() > 0) {
                builder.statusMessage((String)resultList.get(0));
            }
            ctx.writeAndFlush((Object)builder.create());
            return;
        }
        boolean retryOnce = false;
        while (true) {
            if (ctx.channel().isWritable()) {
                Frame frame = null;
                try {
                    frame = GremlinResultProcessor.makeFrame((Context)context, (RequestMessage)msg, (MessageSerializer)serializer, (boolean)useBinary, resultList, (ResponseStatusCode)statusCode, Collections.emptyMap(), Collections.emptyMap());
                    ctx.writeAndFlush((Object)frame).get();
                    break;
                }
                catch (Exception e) {
                    if (frame != null) {
                        frame.tryRelease();
                    }
                    logger.error("write " + resultList.size() + " result to context " + context + " status code=>" + statusCode + " fail", (Throwable)e);
                    throw new RuntimeException(e);
                }
            }
            if (retryOnce) {
                String message = "write result to context fail for context " + msg + " is too busy";
                logger.error(message);
                throw new RuntimeException(message);
            }
            logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded on " + msg + " - writing will continue once client has caught up");
            retryOnce = true;
            try {
                TimeUnit.MILLISECONDS.sleep(10L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

