/*
 * Decompiled with CFR 0.152.
 */
package org.ray.runtime;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.concurrent.Callable;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.RayPyActor;
import org.ray.api.WaitResult;
import org.ray.api.exception.RayException;
import org.ray.api.function.RayFunc;
import org.ray.api.function.RayFuncVoid;
import org.ray.api.id.JobId;
import org.ray.api.id.ObjectId;
import org.ray.api.options.ActorCreationOptions;
import org.ray.api.options.CallOptions;
import org.ray.api.runtime.RayRuntime;
import org.ray.api.runtimecontext.RuntimeContext;
import org.ray.runtime.actor.NativeRayActor;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.context.RuntimeContextImpl;
import org.ray.runtime.context.WorkerContext;
import org.ray.runtime.functionmanager.FunctionDescriptor;
import org.ray.runtime.functionmanager.FunctionManager;
import org.ray.runtime.functionmanager.JavaFunctionDescriptor;
import org.ray.runtime.functionmanager.PyFunctionDescriptor;
import org.ray.runtime.gcs.GcsClient;
import org.ray.runtime.generated.Common;
import org.ray.runtime.object.ObjectStore;
import org.ray.runtime.object.RayObjectImpl;
import org.ray.runtime.task.ArgumentsBuilder;
import org.ray.runtime.task.FunctionArg;
import org.ray.runtime.task.TaskExecutor;
import org.ray.runtime.task.TaskSubmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRayRuntime
implements RayRuntime {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRayRuntime.class);
    public static final String PYTHON_INIT_METHOD_NAME = "__init__";
    protected RayConfig rayConfig;
    protected TaskExecutor taskExecutor;
    protected FunctionManager functionManager;
    protected RuntimeContext runtimeContext;
    protected GcsClient gcsClient;
    protected ObjectStore objectStore;
    protected TaskSubmitter taskSubmitter;
    protected WorkerContext workerContext;

    public AbstractRayRuntime(RayConfig rayConfig, FunctionManager functionManager) {
        this.rayConfig = rayConfig;
        this.functionManager = functionManager;
        this.runtimeContext = new RuntimeContextImpl(this);
    }

    @Override
    public abstract void shutdown();

    @Override
    public <T> RayObject<T> put(T obj) {
        ObjectId objectId = this.objectStore.put(obj);
        return new RayObjectImpl(objectId);
    }

    @Override
    public <T> T get(ObjectId objectId) throws RayException {
        List<T> ret = this.get(ImmutableList.of(objectId));
        return ret.get(0);
    }

    @Override
    public <T> List<T> get(List<ObjectId> objectIds) {
        return this.objectStore.get(objectIds);
    }

    @Override
    public void free(List<ObjectId> objectIds, boolean localOnly, boolean deleteCreatingTasks) {
        this.objectStore.delete(objectIds, localOnly, deleteCreatingTasks);
    }

    @Override
    public <T> WaitResult<T> wait(List<RayObject<T>> waitList, int numReturns, int timeoutMs) {
        return this.objectStore.wait(waitList, numReturns, timeoutMs);
    }

    @Override
    public RayObject call(RayFunc func, Object[] args, CallOptions options) {
        JavaFunctionDescriptor functionDescriptor = this.functionManager.getFunction((JobId)this.workerContext.getCurrentJobId(), (RayFunc)func).functionDescriptor;
        int numReturns = func instanceof RayFuncVoid ? 0 : 1;
        return this.callNormalFunction(functionDescriptor, args, numReturns, options);
    }

    @Override
    public RayObject call(RayFunc func, RayActor<?> actor, Object[] args) {
        JavaFunctionDescriptor functionDescriptor = this.functionManager.getFunction((JobId)this.workerContext.getCurrentJobId(), (RayFunc)func).functionDescriptor;
        int numReturns = func instanceof RayFuncVoid ? 0 : 1;
        return this.callActorFunction(actor, functionDescriptor, args, numReturns);
    }

    @Override
    public <T> RayActor<T> createActor(RayFunc actorFactoryFunc, Object[] args, ActorCreationOptions options) {
        JavaFunctionDescriptor functionDescriptor = this.functionManager.getFunction((JobId)this.workerContext.getCurrentJobId(), (RayFunc)actorFactoryFunc).functionDescriptor;
        return this.createActorImpl(functionDescriptor, args, options);
    }

    private void checkPyArguments(Object[] args) {
        for (Object arg : args) {
            Preconditions.checkArgument(arg instanceof RayPyActor || arg instanceof byte[], "Python argument can only be a RayPyActor or a byte array, not {}.", (Object)arg.getClass().getName());
        }
    }

    @Override
    public RayObject callPy(String moduleName, String functionName, Object[] args, CallOptions options) {
        this.checkPyArguments(args);
        PyFunctionDescriptor functionDescriptor = new PyFunctionDescriptor(moduleName, "", functionName);
        return this.callNormalFunction(functionDescriptor, args, 1, options);
    }

    @Override
    public RayObject callPy(RayPyActor pyActor, String functionName, Object ... args) {
        this.checkPyArguments(args);
        PyFunctionDescriptor functionDescriptor = new PyFunctionDescriptor(pyActor.getModuleName(), pyActor.getClassName(), functionName);
        return this.callActorFunction(pyActor, functionDescriptor, args, 1);
    }

    @Override
    public RayPyActor createPyActor(String moduleName, String className, Object[] args, ActorCreationOptions options) {
        this.checkPyArguments(args);
        PyFunctionDescriptor functionDescriptor = new PyFunctionDescriptor(moduleName, className, PYTHON_INIT_METHOD_NAME);
        return (RayPyActor)this.createActorImpl(functionDescriptor, args, options);
    }

    @Override
    public Runnable wrapRunnable(Runnable runnable) {
        return runnable;
    }

    @Override
    public Callable wrapCallable(Callable callable) {
        return callable;
    }

    private RayObject callNormalFunction(FunctionDescriptor functionDescriptor, Object[] args, int numReturns, CallOptions options) {
        List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args, functionDescriptor.getLanguage(), false);
        List<ObjectId> returnIds = this.taskSubmitter.submitTask(functionDescriptor, functionArgs, numReturns, options);
        Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1);
        if (returnIds.isEmpty()) {
            return null;
        }
        return new RayObjectImpl(returnIds.get(0));
    }

    private RayObject callActorFunction(RayActor rayActor, FunctionDescriptor functionDescriptor, Object[] args, int numReturns) {
        List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args, functionDescriptor.getLanguage(), this.isDirectCall(rayActor));
        List<ObjectId> returnIds = this.taskSubmitter.submitActorTask(rayActor, functionDescriptor, functionArgs, numReturns, null);
        Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1);
        if (returnIds.isEmpty()) {
            return null;
        }
        return new RayObjectImpl(returnIds.get(0));
    }

    private RayActor createActorImpl(FunctionDescriptor functionDescriptor, Object[] args, ActorCreationOptions options) {
        List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args, functionDescriptor.getLanguage(), false);
        if (functionDescriptor.getLanguage() != Common.Language.JAVA && options != null) {
            Preconditions.checkState(Strings.isNullOrEmpty(options.jvmOptions));
        }
        RayActor actor = this.taskSubmitter.createActor(functionDescriptor, functionArgs, options);
        return actor;
    }

    private boolean isDirectCall(RayActor rayActor) {
        if (rayActor instanceof NativeRayActor) {
            return ((NativeRayActor)rayActor).isDirectCallActor();
        }
        return false;
    }

    public WorkerContext getWorkerContext() {
        return this.workerContext;
    }

    public ObjectStore getObjectStore() {
        return this.objectStore;
    }

    public FunctionManager getFunctionManager() {
        return this.functionManager;
    }

    public RayConfig getRayConfig() {
        return this.rayConfig;
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }

    public GcsClient getGcsClient() {
        return this.gcsClient;
    }
}

