/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.graphscope.common.client;

import com.alibaba.graphscope.common.client.ExecutionClient;
import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
import com.alibaba.graphscope.common.client.type.ExecutionRequest;
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.HiactorConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.gaia.proto.GraphAlgebraPhysical;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.alibaba.graphscope.gaia.proto.StoredProcedure;
import com.alibaba.graphscope.interactive.client.Session;
import com.alibaba.graphscope.interactive.client.impl.DefaultSession;
import com.google.common.collect.Lists;
import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpExecutionClient
extends ExecutionClient<URI> {
    private static final Logger logger = LoggerFactory.getLogger(HttpExecutionClient.class);
    private final Session session;

    public HttpExecutionClient(Configs graphConfig, ChannelFetcher<URI> channelFetcher) {
        super(channelFetcher);
        this.session = DefaultSession.newInstance((String)HiactorConfig.INTERACTIVE_ADMIN_ENDPOINT.get(graphConfig), (String)HiactorConfig.INTERACTIVE_QUERY_ENDPOINT.get(graphConfig));
    }

    @Override
    public void submit(ExecutionRequest request, ExecutionResponseListener listener, QueryTimeoutConfig timeoutConfig) throws Exception {
        ArrayList responseFutures = Lists.newArrayList();
        for (URI httpURI : this.channelFetcher.fetch()) {
            CompletableFuture future;
            byte[] bytes;
            if (request.getRequestLogical().getRegularQuery() != null) {
                bytes = (byte[])request.getRequestPhysical().getContent();
                future = this.session.runAdhocQueryAsync(GraphAlgebraPhysical.PhysicalPlan.parseFrom(bytes));
            } else if (request.getRequestLogical().getProcedureCall() != null) {
                bytes = (byte[])request.getRequestPhysical().getContent();
                future = this.session.callProcedureAsync(StoredProcedure.Query.parseFrom(bytes));
            } else {
                throw new IllegalArgumentException("the request can not be sent to the remote service, expect a regular query or a procedure call");
            }
            CompletionStage responseFuture = future.whenComplete((response, exception) -> {
                if (exception != null) {
                    listener.onError((Throwable)exception);
                }
                if (!response.isOk()) {
                    String errorMessage = new String(response.getStatusMessage());
                    RuntimeException ex = new RuntimeException("Query execution failed: response status code is " + (Enum)response.getStatusCode() + ", error message: " + errorMessage);
                    listener.onError(ex);
                }
                IrResult.CollectiveResults results = (IrResult.CollectiveResults)response.getValue();
                for (IrResult.Results irResult : results.getResultsList()) {
                    listener.onNext(irResult.getRecord());
                }
            });
            responseFutures.add(responseFuture);
        }
        CompletableFuture<Void> joinFuture = CompletableFuture.runAsync(() -> {
            try {
                CompletableFuture.allOf(responseFutures.toArray(new CompletableFuture[0])).get();
                listener.onCompleted();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        joinFuture.whenComplete((aVoid, exception) -> {
            if (exception != null) {
                listener.onError((Throwable)exception);
            }
        });
    }

    @Override
    public void close() throws Exception {
    }
}

