/*
 * Decompiled with CFR 0.152.
 */
package io.ray.serve.api;

import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.PyActorHandle;
import io.ray.api.Ray;
import io.ray.api.function.PyActorMethod;
import io.ray.serve.config.DeploymentConfig;
import io.ray.serve.config.ReplicaConfig;
import io.ray.serve.controller.ServeController;
import io.ray.serve.exception.RayServeException;
import io.ray.serve.generated.DeploymentRoute;
import io.ray.serve.generated.DeploymentRouteList;
import io.ray.serve.generated.DeploymentStatus;
import io.ray.serve.generated.DeploymentStatusInfo;
import io.ray.serve.generated.EndpointInfo;
import io.ray.serve.generated.StatusOverview;
import io.ray.serve.handle.RayServeHandle;
import io.ray.serve.util.LogUtil;
import io.ray.serve.util.ServeProtoUtil;
import io.ray.shaded.com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServeControllerClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServeControllerClient.class);
    private static long CLIENT_POLLING_INTERVAL_S = 1L;
    private BaseActorHandle controller;
    private String controllerName;
    private boolean detached;
    private boolean shutdown;
    private Map<String, RayServeHandle> handleCache = new ConcurrentHashMap<String, RayServeHandle>();
    private String rootUrl;

    public ServeControllerClient(BaseActorHandle controller, String controllerName, boolean detached) {
        this.controller = controller;
        this.controllerName = controllerName;
        this.detached = detached;
        this.rootUrl = controller instanceof PyActorHandle ? (String)((PyActorHandle)controller).task(PyActorMethod.of("get_root_url")).remote().get() : ((ActorHandle)controller).task(ServeController::getRootUrl).remote().get();
    }

    public RayServeHandle getHandle(String deploymentName, boolean missingOk) {
        String cacheKey = deploymentName + "#" + missingOk;
        if (this.handleCache.containsKey(cacheKey)) {
            return this.handleCache.get(cacheKey);
        }
        Map<String, EndpointInfo> endpoints = null;
        if (this.controller instanceof PyActorHandle) {
            endpoints = ServeProtoUtil.parseEndpointSet((byte[])((PyActorHandle)this.controller).task(PyActorMethod.of("get_all_endpoints_java")).remote().get());
        } else {
            LOGGER.warn("Client currently only supports the Python controller.");
            endpoints = ServeProtoUtil.parseEndpointSet(((ActorHandle)this.controller).task(ServeController::getAllEndpoints).remote().get());
        }
        if (!(missingOk || endpoints != null && endpoints.containsKey(deploymentName))) {
            throw new RayServeException(LogUtil.format("Deployment {} does not exist.", deploymentName));
        }
        RayServeHandle handle = new RayServeHandle(this.controller, deploymentName, null, null);
        this.handleCache.put(cacheKey, handle);
        return handle;
    }

    public void deploy(String name, String deploymentDef, Object[] initArgs, Map<String, Object> rayActorOptions, DeploymentConfig deploymentConfig, String version, String prevVersion, String routePrefix, String url, Boolean blocking) {
        if (deploymentConfig == null) {
            deploymentConfig = new DeploymentConfig();
        }
        if (rayActorOptions == null) {
            rayActorOptions = new HashMap<String, Object>();
        }
        ReplicaConfig replicaConfig = new ReplicaConfig(deploymentDef, initArgs, rayActorOptions);
        deploymentConfig.setVersion(version);
        deploymentConfig.setPrevVersion(prevVersion);
        if (deploymentConfig.getAutoscalingConfig() != null && deploymentConfig.getMaxConcurrentQueries() < deploymentConfig.getAutoscalingConfig().getTargetNumOngoingRequestsPerReplica()) {
            LOGGER.warn("Autoscaling will never happen, because 'max_concurrent_queries' is less than 'target_num_ongoing_requests_per_replica'.");
        }
        boolean updating = (Boolean)((PyActorHandle)this.controller).task(PyActorMethod.of("deploy"), name, deploymentConfig.toProtoBytes(), replicaConfig.toProtoBytes(), routePrefix, Ray.getRuntimeContext().getCurrentJobId().getBytes()).remote().get();
        String tag = "component=serve deployment=" + name;
        if (updating) {
            String msg = LogUtil.format("Updating deployment '{}'", name);
            if (StringUtils.isNotBlank(version)) {
                msg = msg + LogUtil.format(" to version '{}'", version);
            }
            LOGGER.info("{}. {}", (Object)msg, (Object)tag);
        } else {
            LOGGER.info("Deployment '{}' is already at version '{}', not updating. {}", name, version, tag);
        }
        if (blocking.booleanValue()) {
            this.waitForDeploymentHealthy(name);
            String urlPart = url != null ? LogUtil.format(" at `{}`", url) : "";
            LOGGER.info("Deployment '{}{}' is ready {}. {}", name, StringUtils.isNotBlank(version) ? "':'" + version : "", urlPart, tag);
        }
    }

    private void waitForDeploymentHealthy(String name, Long timeoutS) {
        long start = System.currentTimeMillis();
        boolean isTimeout = true;
        while (timeoutS == null || System.currentTimeMillis() - start < timeoutS * 1000L) {
            DeploymentStatusInfo status = this.getDeploymentStatus(name);
            if (status == null) {
                throw new RayServeException(LogUtil.format("Waiting for deployment {} to be HEALTHY, but deployment doesn't exist.", name));
            }
            if (status.getStatus() == DeploymentStatus.DEPLOYMENT_STATUS_HEALTHY) {
                isTimeout = false;
                break;
            }
            if (status.getStatus() == DeploymentStatus.DEPLOYMENT_STATUS_UNHEALTHY) {
                throw new RayServeException(LogUtil.format("Deployment {} is UNHEALTHY: {}", name, status.getMessage()));
            }
            Preconditions.checkState(status.getStatus() == DeploymentStatus.DEPLOYMENT_STATUS_UPDATING);
            LOGGER.debug("Waiting for {} to be healthy, current status: {}.", (Object)name, (Object)status.getStatus());
            try {
                Thread.sleep(CLIENT_POLLING_INTERVAL_S * 1000L);
            }
            catch (InterruptedException interruptedException) {}
        }
        if (isTimeout) {
            throw new RayServeException(LogUtil.format("Deployment {} did not become HEALTHY after {}s.", name, timeoutS));
        }
    }

    private void waitForDeploymentHealthy(String name) {
        this.waitForDeploymentHealthy(name, null);
    }

    public synchronized void shutdown() {
        if (Ray.isInitialized() && !this.shutdown) {
            Optional controllerHandle;
            ((PyActorHandle)this.controller).task(PyActorMethod.of("shutdown")).remote();
            this.waitForDeploymentsShutdown(60L);
            this.controller.kill();
            long started = System.currentTimeMillis();
            while ((controllerHandle = Ray.getActor(this.controllerName, "serve")).isPresent()) {
                long currentTime = System.currentTimeMillis();
                if (currentTime - started <= 5000L) continue;
                LOGGER.warn("Waited 5s for Serve to shutdown gracefully but the controller is still not cleaned up. You can ignore this warning if you are shutting down the Ray cluster.");
                break;
            }
            this.shutdown = true;
        }
    }

    private void waitForDeploymentsShutdown(long timeoutS) {
        long start = System.currentTimeMillis();
        List<DeploymentStatusInfo> deploymentStatuses = null;
        while (System.currentTimeMillis() - start < timeoutS * 1000L) {
            StatusOverview statusOverview = this.getServeStatus();
            if (statusOverview == null || statusOverview.getDeploymentStatuses() == null || statusOverview.getDeploymentStatuses().getDeploymentStatusInfosList() == null || statusOverview.getDeploymentStatuses().getDeploymentStatusInfosList().isEmpty()) {
                return;
            }
            deploymentStatuses = statusOverview.getDeploymentStatuses().getDeploymentStatusInfosList();
            LOGGER.debug("Waiting for shutdown, {} deployments still alive.", (Object)deploymentStatuses.size());
            try {
                Thread.sleep(CLIENT_POLLING_INTERVAL_S * 1000L);
            }
            catch (InterruptedException interruptedException) {}
        }
        ArrayList<String> liveNames = new ArrayList<String>();
        if (deploymentStatuses != null) {
            for (DeploymentStatusInfo status : deploymentStatuses) {
                liveNames.add(status.getName());
            }
        }
        throw new RayServeException(LogUtil.format("Shutdown didn't complete after {}s. Deployments still alive: {}.", timeoutS, liveNames));
    }

    public String getRootUrl() {
        return this.rootUrl;
    }

    public io.ray.serve.deployment.DeploymentRoute getDeploymentInfo(String name) {
        return io.ray.serve.deployment.DeploymentRoute.fromProtoBytes((byte[])((PyActorHandle)this.controller).task(PyActorMethod.of("get_deployment_info"), name).remote().get());
    }

    public Map<String, io.ray.serve.deployment.DeploymentRoute> listDeployments() {
        DeploymentRouteList deploymentRouteList = ServeProtoUtil.bytesToProto((byte[])((PyActorHandle)this.controller).task(PyActorMethod.of("list_deployments")).remote().get(), DeploymentRouteList::parseFrom);
        if (deploymentRouteList == null || deploymentRouteList.getDeploymentRoutesList() == null) {
            return Collections.emptyMap();
        }
        HashMap<String, io.ray.serve.deployment.DeploymentRoute> deploymentRoutes = new HashMap<String, io.ray.serve.deployment.DeploymentRoute>(deploymentRouteList.getDeploymentRoutesList().size());
        for (DeploymentRoute deploymentRoute : deploymentRouteList.getDeploymentRoutesList()) {
            deploymentRoutes.put(deploymentRoute.getDeploymentInfo().getName(), io.ray.serve.deployment.DeploymentRoute.fromProto(deploymentRoute));
        }
        return deploymentRoutes;
    }

    public void deleteDeployment(String name, boolean blocking) {
        ((PyActorHandle)this.controller).task(PyActorMethod.of("delete_deployment")).remote();
        if (blocking) {
            this.waitForDeploymentDeleted(name, 60L);
        }
    }

    private void waitForDeploymentDeleted(String name, long timeoutS) {
        DeploymentStatusInfo status;
        long start = System.currentTimeMillis();
        while (System.currentTimeMillis() - start < timeoutS * 1000L && (status = this.getDeploymentStatus(name)) != null) {
            LOGGER.debug("Waiting for {} to be deleted, current status: {}.", (Object)name, (Object)status);
            try {
                Thread.sleep(CLIENT_POLLING_INTERVAL_S * 1000L);
            }
            catch (InterruptedException interruptedException) {}
        }
        throw new RayServeException(LogUtil.format("Deployment {} wasn't deleted after {}s.", name, timeoutS));
    }

    private StatusOverview getServeStatus() {
        return ServeProtoUtil.bytesToProto((byte[])((PyActorHandle)this.controller).task(PyActorMethod.of("get_serve_status")).remote().get(), StatusOverview::parseFrom);
    }

    private DeploymentStatusInfo getDeploymentStatus(String name) {
        return ServeProtoUtil.bytesToProto((byte[])((PyActorHandle)this.controller).task(PyActorMethod.of("get_deployment_status"), name).remote().get(), DeploymentStatusInfo::parseFrom);
    }

    public BaseActorHandle getController() {
        return this.controller;
    }
}

