/*
 * Decompiled with CFR 0.152.
 */
package io.ray.serve.router;

import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.PyActorHandle;
import io.ray.api.Ray;
import io.ray.api.call.PyActorTaskCaller;
import io.ray.api.function.PyActorMethod;
import io.ray.serve.exception.RayServeException;
import io.ray.serve.generated.DeploymentTargetInfo;
import io.ray.serve.replica.RayServeWrappedReplica;
import io.ray.serve.router.Query;
import io.ray.serve.util.CollectionUtil;
import io.ray.shaded.com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicaSet {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaSet.class);
    private final Map<String, Set<ObjectRef<Object>>> inFlightQueries = new ConcurrentHashMap<String, Set<ObjectRef<Object>>>();
    private final Map<String, BaseActorHandle> allActorHandles = new ConcurrentHashMap<String, BaseActorHandle>();
    private boolean hasPullReplica = false;

    public ReplicaSet(String deploymentName) {
    }

    public synchronized void updateWorkerReplicas(Object deploymentTargetInfo) {
        if (null != deploymentTargetInfo) {
            HashSet<String> actorNameSet = new HashSet<String>(((DeploymentTargetInfo)deploymentTargetInfo).getReplicaNamesList());
            HashSet<String> added = new HashSet<String>(Sets.difference(actorNameSet, this.inFlightQueries.keySet()));
            HashSet<String> removed = new HashSet<String>(Sets.difference(this.inFlightQueries.keySet(), actorNameSet));
            added.forEach(name -> {
                Optional handleOptional = Ray.getActor(name, "serve");
                if (handleOptional.isPresent()) {
                    this.allActorHandles.put((String)name, (BaseActorHandle)handleOptional.get());
                    this.inFlightQueries.put((String)name, Sets.newConcurrentHashSet());
                } else {
                    LOGGER.warn("Can not get actor handle. actor name is {}", name);
                }
            });
            removed.forEach(this.inFlightQueries::remove);
            removed.forEach(this.allActorHandles::remove);
            if (added.size() > 0 || removed.size() > 0) {
                LOGGER.info("ReplicaSet: +{}, -{} replicas.", (Object)added.size(), (Object)removed.size());
            }
        }
        this.hasPullReplica = true;
    }

    public ObjectRef<Object> assignReplica(Query query) {
        ObjectRef<Object> assignedRef = this.tryAssignReplica(query);
        return assignedRef;
    }

    private ObjectRef<Object> tryAssignReplica(Query query) {
        for (int loopCount = 0; !this.hasPullReplica && loopCount < 50; ++loopCount) {
            try {
                TimeUnit.MICROSECONDS.sleep(20L);
                continue;
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        ArrayList<BaseActorHandle> handles = new ArrayList<BaseActorHandle>(this.allActorHandles.values());
        if (CollectionUtil.isEmpty(handles)) {
            throw new RayServeException("ReplicaSet found no replica.");
        }
        int randomIndex = RandomUtils.nextInt(0, handles.size());
        BaseActorHandle replica = (BaseActorHandle)handles.get(randomIndex);
        LOGGER.debug("Assigned query {} to replica {}.", (Object)query.getMetadata().getRequestId(), (Object)replica);
        if (replica instanceof PyActorHandle) {
            Object[] args = Stream.concat(Stream.of(query.getMetadata().toByteArray()), Arrays.stream((Object[])query.getArgs())).toArray();
            PyActorTaskCaller<Object> pyCaller = new PyActorTaskCaller<Object>((PyActorHandle)replica, PyActorMethod.of("handle_request_from_java"), args);
            return pyCaller.remote();
        }
        return ((ActorHandle)replica).task(RayServeWrappedReplica::handleRequest, query.getMetadata().toByteArray(), query.getArgs()).remote();
    }

    public Map<String, Set<ObjectRef<Object>>> getInFlightQueries() {
        return this.inFlightQueries;
    }
}

