/*
 * Decompiled with CFR 0.152.
 */
package io.ray.serve.replica;

import io.ray.api.BaseActorHandle;
import io.ray.runtime.metric.Count;
import io.ray.runtime.metric.Gauge;
import io.ray.runtime.metric.Histogram;
import io.ray.runtime.metric.Metrics;
import io.ray.serve.api.Serve;
import io.ray.serve.common.Constants;
import io.ray.serve.config.DeploymentConfig;
import io.ray.serve.context.ContextUtil;
import io.ray.serve.deployment.DeploymentId;
import io.ray.serve.deployment.DeploymentVersion;
import io.ray.serve.exception.RayServeException;
import io.ray.serve.generated.RequestMetadata;
import io.ray.serve.metrics.RayServeMetrics;
import io.ray.serve.replica.RayServeReplica;
import io.ray.serve.router.Query;
import io.ray.serve.util.MessageFormatter;
import io.ray.serve.util.ReflectUtil;
import io.ray.shaded.com.google.common.collect.ImmutableMap;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RayServeReplicaImpl
implements RayServeReplica {
    private static final Logger LOGGER = LoggerFactory.getLogger(RayServeReplicaImpl.class);
    private String deploymentName;
    private DeploymentId deploymentId;
    private String replicaTag;
    private DeploymentConfig config;
    private AtomicInteger numOngoingRequests = new AtomicInteger();
    private Object callable;
    private Count requestCounter;
    private Count errorCounter;
    private Count restartCounter;
    private Histogram processingLatencyTracker;
    private Gauge numProcessingItems;
    private DeploymentVersion version;
    private boolean isDeleted = false;
    private final Method checkHealthMethod;
    private final Method callMethod;

    public RayServeReplicaImpl(Object callable, DeploymentConfig deploymentConfig, DeploymentVersion version, BaseActorHandle actorHandle, String appName) {
        this.deploymentName = Serve.getReplicaContext().getDeploymentName();
        this.deploymentId = new DeploymentId(this.deploymentName, appName);
        this.replicaTag = Serve.getReplicaContext().getReplicaTag();
        this.callable = callable;
        this.config = deploymentConfig;
        this.version = version;
        this.checkHealthMethod = this.getRunnerMethod("checkHealth", null, true);
        this.callMethod = this.getRunnerMethod("call", new Object[]{new Object()}, true);
        this.registerMetrics();
    }

    private void registerMetrics() {
        RayServeMetrics.execute(() -> {
            this.requestCounter = (Count)((Metrics.CountBuilder)((Metrics.CountBuilder)((Metrics.CountBuilder)((Metrics.CountBuilder)Metrics.count().name(RayServeMetrics.SERVE_DEPLOYMENT_REQUEST_COUNTER.getName())).description(RayServeMetrics.SERVE_DEPLOYMENT_REQUEST_COUNTER.getDescription())).unit("")).tags(ImmutableMap.of("deployment", this.deploymentName, "replica", this.replicaTag))).register();
        });
        RayServeMetrics.execute(() -> {
            this.errorCounter = (Count)((Metrics.CountBuilder)((Metrics.CountBuilder)((Metrics.CountBuilder)((Metrics.CountBuilder)Metrics.count().name(RayServeMetrics.SERVE_DEPLOYMENT_ERROR_COUNTER.getName())).description(RayServeMetrics.SERVE_DEPLOYMENT_ERROR_COUNTER.getDescription())).unit("")).tags(ImmutableMap.of("deployment", this.deploymentName, "replica", this.replicaTag))).register();
        });
        RayServeMetrics.execute(() -> {
            this.restartCounter = (Count)((Metrics.CountBuilder)((Metrics.CountBuilder)((Metrics.CountBuilder)((Metrics.CountBuilder)Metrics.count().name(RayServeMetrics.SERVE_DEPLOYMENT_REPLICA_STARTS.getName())).description(RayServeMetrics.SERVE_DEPLOYMENT_REPLICA_STARTS.getDescription())).unit("")).tags(ImmutableMap.of("deployment", this.deploymentName, "replica", this.replicaTag))).register();
        });
        RayServeMetrics.execute(() -> {
            this.processingLatencyTracker = (Histogram)((Metrics.HistogramBuilder)((Metrics.HistogramBuilder)((Metrics.HistogramBuilder)((Metrics.HistogramBuilder)Metrics.histogram().name(RayServeMetrics.SERVE_DEPLOYMENT_PROCESSING_LATENCY_MS.getName())).description(RayServeMetrics.SERVE_DEPLOYMENT_PROCESSING_LATENCY_MS.getDescription())).unit("")).boundaries(Constants.DEFAULT_LATENCY_BUCKET_MS).tags(ImmutableMap.of("deployment", this.deploymentName, "replica", this.replicaTag))).register();
        });
        RayServeMetrics.execute(() -> {
            this.numProcessingItems = (Gauge)((Metrics.GaugeBuilder)((Metrics.GaugeBuilder)((Metrics.GaugeBuilder)((Metrics.GaugeBuilder)Metrics.gauge().name(RayServeMetrics.SERVE_REPLICA_PROCESSING_QUERIES.getName())).description(RayServeMetrics.SERVE_REPLICA_PROCESSING_QUERIES.getDescription())).unit("")).tags(ImmutableMap.of("deployment", this.deploymentName, "replica", this.replicaTag))).register();
        });
        RayServeMetrics.execute(() -> this.restartCounter.inc(1.0));
    }

    @Override
    public Object handleRequest(Object requestMetadata, Object requestArgs) {
        long startTime = System.currentTimeMillis();
        Query request = new Query((RequestMetadata)requestMetadata, requestArgs);
        LOGGER.debug("Replica {} received request {}", (Object)this.replicaTag, (Object)request.getMetadata().getRequestId());
        this.numOngoingRequests.incrementAndGet();
        RayServeMetrics.execute(() -> this.numProcessingItems.update(this.numOngoingRequests.get()));
        Object result = this.invokeSingle(request);
        this.numOngoingRequests.decrementAndGet();
        long requestTimeMs = System.currentTimeMillis() - startTime;
        LOGGER.debug("Replica {} finished request {} in {}ms", this.replicaTag, request.getMetadata().getRequestId(), requestTimeMs);
        return result;
    }

    private Object invokeSingle(Query requestItem) {
        long start = System.currentTimeMillis();
        Method methodToCall = null;
        try {
            ContextUtil.setRequestContext(null, requestItem.getMetadata().getRequestId(), null, null);
            LOGGER.debug("Replica {} started executing request {}", (Object)this.replicaTag, (Object)requestItem.getMetadata().getRequestId());
            Object[] args = this.parseRequestItem(requestItem);
            methodToCall = args.length == 1 && this.callMethod != null ? this.callMethod : this.getRunnerMethod(requestItem.getMetadata().getCallMethod(), args, false);
            Object result = methodToCall.invoke(this.callable, args);
            RayServeMetrics.execute(() -> this.requestCounter.inc(1.0));
            Object object = result;
            return object;
        }
        catch (Throwable e) {
            RayServeMetrics.execute(() -> this.errorCounter.inc(1.0));
            throw new RayServeException(MessageFormatter.format("Replica {} failed to invoke method {}", this.replicaTag, methodToCall == null ? "unknown" : methodToCall.getName()), e);
        }
        finally {
            RayServeMetrics.execute(() -> this.processingLatencyTracker.update(System.currentTimeMillis() - start));
            ContextUtil.clean();
        }
    }

    private Object[] parseRequestItem(Query requestItem) {
        if (requestItem.getArgs() == null) {
            return new Object[0];
        }
        if (requestItem.getArgs() instanceof Object[]) {
            return (Object[])requestItem.getArgs();
        }
        return new Object[]{requestItem.getArgs()};
    }

    private Method getRunnerMethod(String methodName, Object[] args, boolean isNullable) {
        try {
            return ReflectUtil.getMethod(this.callable.getClass(), methodName, args);
        }
        catch (NoSuchMethodException e) {
            String errMsg = MessageFormatter.format("Tried to call a method {} that does not exist. Available methods: {}", methodName, ReflectUtil.getMethodStrings(this.callable.getClass()));
            if (isNullable) {
                LOGGER.warn(errMsg);
                return null;
            }
            LOGGER.error(errMsg, e);
            throw new RayServeException(errMsg, e);
        }
    }

    @Override
    public synchronized boolean prepareForShutdown() {
        while (true) {
            try {
                Thread.sleep((long)(this.config.getGracefulShutdownWaitLoopS() * 1000.0));
            }
            catch (InterruptedException e) {
                LOGGER.error("Replica {} was interrupted in sheep when draining pending queries", (Object)this.replicaTag);
            }
            int numOngoingRequest = this.getNumOngoingRequests();
            if (numOngoingRequest <= 0) break;
            LOGGER.info("Waiting for an additional {}s to shut down because there are {} ongoing requests.", (Object)this.config.getGracefulShutdownWaitLoopS(), (Object)numOngoingRequest);
        }
        LOGGER.info("Graceful shutdown complete; replica exiting.");
        try {
            if (!this.isDeleted) {
                ReflectUtil.getMethod(this.callable.getClass(), "del", new Object[0]).invoke(this.callable, new Object[0]);
            }
        }
        catch (NoSuchMethodException e) {
            LOGGER.warn("Deployment {} has no del method.", (Object)this.deploymentName);
        }
        catch (Throwable e) {
            LOGGER.error("Exception during graceful shutdown of replica.");
        }
        finally {
            this.isDeleted = true;
        }
        return true;
    }

    @Override
    public int getNumOngoingRequests() {
        return this.numOngoingRequests.get();
    }

    @Override
    public DeploymentVersion reconfigure(byte[] deploymentConfigBytes) {
        DeploymentVersion deploymentVersion;
        this.config = DeploymentConfig.fromProtoBytes(deploymentConfigBytes);
        Object userConfig = this.config.getUserConfig();
        this.version = deploymentVersion = new DeploymentVersion(this.version.getCodeVersion(), this.config, this.version.getRayActorOptions());
        if (userConfig != null) {
            this.updateUserConfig(userConfig);
        }
        return this.version;
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void updateUserConfig(Object userConfig) {
        LOGGER.info("Replica {} of deployment {} reconfigure userConfig: {}", this.replicaTag, this.deploymentName, userConfig);
        try {
            ReflectUtil.getMethod(this.callable.getClass(), "reconfigure", userConfig).invoke(this.callable, userConfig);
        }
        catch (NoSuchMethodException e) {
            try {
                String errMsg = MessageFormatter.format("userConfig specified but deployment {} missing {} method", this.deploymentId, "reconfigure");
                LOGGER.error(errMsg);
                throw new RayServeException(errMsg, e);
                catch (Throwable e2) {
                    errMsg = MessageFormatter.format("Replica {} of deployment {} failed to reconfigure userConfig {}", this.replicaTag, this.deploymentId, userConfig);
                    LOGGER.error(errMsg);
                    throw new RayServeException(errMsg, e2);
                }
            }
            catch (Throwable throwable) {
                LOGGER.info("Replica {} of deployment {} finished reconfiguring userConfig: {}", this.replicaTag, this.deploymentId, userConfig);
                throw throwable;
            }
        }
        LOGGER.info("Replica {} of deployment {} finished reconfiguring userConfig: {}", this.replicaTag, this.deploymentId, userConfig);
    }

    public DeploymentVersion getVersion() {
        return this.version;
    }

    @Override
    public boolean checkHealth() {
        boolean result;
        block6: {
            if (this.checkHealthMethod == null) {
                return true;
            }
            result = true;
            try {
                LOGGER.info("Replica {} of deployment {} check health of {}", this.replicaTag, this.deploymentName, this.callable.getClass().getName());
                Object isHealthy = this.checkHealthMethod.invoke(this.callable, new Object[0]);
                if (!(isHealthy instanceof Boolean)) {
                    LOGGER.error("The health check result {} of {} in replica {} of deployment {} is illegal.", isHealthy == null ? "null" : isHealthy.getClass().getName() + ":" + isHealthy, this.callable.getClass().getName(), this.replicaTag, this.deploymentName);
                    result = false;
                    break block6;
                }
                result = (Boolean)isHealthy;
            }
            catch (Throwable e) {
                try {
                    LOGGER.error("Replica {} of deployment {} failed to check health of {}", this.replicaTag, this.deploymentName, this.callable.getClass().getName(), e);
                    result = false;
                }
                catch (Throwable throwable) {
                    LOGGER.info("The health check result of {} in replica {} of deployment {} is {}.", this.callable.getClass().getName(), this.replicaTag, this.deploymentName, result);
                    throw throwable;
                }
                LOGGER.info("The health check result of {} in replica {} of deployment {} is {}.", this.callable.getClass().getName(), this.replicaTag, this.deploymentName, result);
            }
        }
        LOGGER.info("The health check result of {} in replica {} of deployment {} is {}.", this.callable.getClass().getName(), this.replicaTag, this.deploymentName, result);
        return result;
    }

    public Object getCallable() {
        return this.callable;
    }
}

