/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.graphscope.common.client;

import com.alibaba.graphscope.common.client.ExecutionClient;
import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
import com.alibaba.graphscope.common.client.type.ExecutionRequest;
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.PegasusConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.alibaba.pegasus.RpcChannel;
import com.alibaba.pegasus.RpcClient;
import com.alibaba.pegasus.intf.ResultProcessor;
import com.alibaba.pegasus.service.protocol.PegasusClient;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcExecutionClient
extends ExecutionClient<RpcChannel> {
    Logger logger = LoggerFactory.getLogger(RpcExecutionClient.class);
    private final Configs graphConfig;
    private final AtomicReference<RpcClient> rpcClientRef;

    public RpcExecutionClient(Configs graphConfig, ChannelFetcher<RpcChannel> channelFetcher) {
        super(channelFetcher);
        this.graphConfig = graphConfig;
        this.rpcClientRef = new AtomicReference();
    }

    @Override
    public void submit(ExecutionRequest request, final ExecutionResponseListener listener, QueryTimeoutConfig timeoutConfig) throws Exception {
        if (this.rpcClientRef.get() == null) {
            this.rpcClientRef.compareAndSet(null, new RpcClient(this.channelFetcher.fetch()));
        }
        RpcClient rpcClient = this.rpcClientRef.get();
        PegasusClient.JobRequest jobRequest = PegasusClient.JobRequest.newBuilder().setPlan(ByteString.copyFrom((byte[])((byte[])request.getRequestPhysical().getContent()))).build();
        PegasusClient.JobConfig jobConfig = PegasusClient.JobConfig.newBuilder().setJobId(request.getRequestId().longValue()).setJobName(request.getRequestName()).setWorkers(PegasusConfig.PEGASUS_WORKER_NUM.get(this.graphConfig).intValue()).setBatchSize(PegasusConfig.PEGASUS_BATCH_SIZE.get(this.graphConfig).intValue()).setMemoryLimit(PegasusConfig.PEGASUS_MEMORY_LIMIT.get(this.graphConfig).intValue()).setBatchCapacity(PegasusConfig.PEGASUS_OUTPUT_CAPACITY.get(this.graphConfig).intValue()).setTimeLimit(timeoutConfig.getEngineTimeoutMS()).setAll(PegasusClient.Empty.newBuilder().build()).build();
        jobRequest = jobRequest.toBuilder().setConf(jobConfig).build();
        rpcClient.submit(jobRequest, new ResultProcessor(){

            public void process(PegasusClient.JobResponse jobResponse) {
                try {
                    listener.onNext(IrResult.Results.parseFrom(jobResponse.getResp()).getRecord());
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }

            public void finish() {
                listener.onCompleted();
                RpcExecutionClient.this.logger.info("[compile]: received results from engine");
            }

            public void error(Status status) {
                listener.onError((Throwable)status.asException());
            }
        }, timeoutConfig.getChannelTimeoutMS());
    }

    @Override
    public void close() throws Exception {
        if (this.rpcClientRef.get() != null) {
            this.rpcClientRef.get().shutdown();
        }
    }
}

