/*
 * Decompiled with CFR 0.152.
 */
package org.ray.runtime;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.concurrent.Callable;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.RayPyActor;
import org.ray.api.WaitResult;
import org.ray.api.function.RayFunc;
import org.ray.api.id.ObjectId;
import org.ray.api.id.UniqueId;
import org.ray.api.options.ActorCreationOptions;
import org.ray.api.options.CallOptions;
import org.ray.api.runtime.RayRuntime;
import org.ray.api.runtimecontext.RuntimeContext;
import org.ray.runtime.RayNativeRuntime;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.config.RunMode;
import org.ray.runtime.functionmanager.FunctionManager;
import org.ray.runtime.generated.Common;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RayMultiWorkerNativeRuntime
implements RayRuntime {
    private static final Logger LOGGER = LoggerFactory.getLogger(RayMultiWorkerNativeRuntime.class);
    private final FunctionManager functionManager;
    private final int numWorkers;
    private final Thread[] threads;
    private final RayNativeRuntime[] runtimes;
    private final ThreadLocal<RayNativeRuntime> currentThreadRuntime = new ThreadLocal();

    public RayMultiWorkerNativeRuntime(RayConfig rayConfig, FunctionManager functionManager) {
        this.functionManager = functionManager;
        Preconditions.checkState(rayConfig.runMode == RunMode.CLUSTER && rayConfig.workerMode == Common.WorkerType.WORKER);
        Preconditions.checkState(rayConfig.numWorkersPerProcess > 0, "numWorkersPerProcess must be greater than 0.");
        this.numWorkers = rayConfig.numWorkersPerProcess;
        this.runtimes = new RayNativeRuntime[this.numWorkers];
        this.threads = new Thread[this.numWorkers];
        LOGGER.info("Starting {} workers.", (Object)this.numWorkers);
        for (int i = 0; i < this.numWorkers; ++i) {
            int workerIndex = i;
            this.threads[i] = new Thread(() -> {
                RayNativeRuntime runtime;
                this.runtimes[workerIndex] = runtime = new RayNativeRuntime(rayConfig, functionManager);
                this.currentThreadRuntime.set(runtime);
                runtime.run();
            });
        }
    }

    public void run() {
        int i;
        for (i = 0; i < this.numWorkers; ++i) {
            this.threads[i].start();
        }
        for (i = 0; i < this.numWorkers; ++i) {
            try {
                this.threads[i].join();
                continue;
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdown() {
        int i;
        for (i = 0; i < this.numWorkers; ++i) {
            this.runtimes[i].shutdown();
        }
        for (i = 0; i < this.numWorkers; ++i) {
            try {
                this.threads[i].join();
                continue;
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public RayNativeRuntime getCurrentRuntime() {
        RayNativeRuntime currentRuntime = this.currentThreadRuntime.get();
        Preconditions.checkNotNull(currentRuntime, "RayRuntime is not set on current thread. If you want to use Ray API in your own threads, please wrap your `Runnable`s or `Callable`s with `Ray.wrapRunnable` or `Ray.wrapCallable`.");
        return currentRuntime;
    }

    @Override
    public <T> RayObject<T> put(T obj) {
        return this.getCurrentRuntime().put(obj);
    }

    @Override
    public <T> T get(ObjectId objectId) {
        return this.getCurrentRuntime().get(objectId);
    }

    @Override
    public <T> List<T> get(List<ObjectId> objectIds) {
        return this.getCurrentRuntime().get(objectIds);
    }

    @Override
    public <T> WaitResult<T> wait(List<RayObject<T>> waitList, int numReturns, int timeoutMs) {
        return this.getCurrentRuntime().wait(waitList, numReturns, timeoutMs);
    }

    @Override
    public void free(List<ObjectId> objectIds, boolean localOnly, boolean deleteCreatingTasks) {
        this.getCurrentRuntime().free(objectIds, localOnly, deleteCreatingTasks);
    }

    @Override
    public void setResource(String resourceName, double capacity, UniqueId nodeId) {
        this.getCurrentRuntime().setResource(resourceName, capacity, nodeId);
    }

    @Override
    public void killActor(RayActor<?> actor) {
        this.getCurrentRuntime().killActor(actor);
    }

    @Override
    public RayObject call(RayFunc func, Object[] args, CallOptions options) {
        return this.getCurrentRuntime().call(func, args, options);
    }

    @Override
    public RayObject call(RayFunc func, RayActor<?> actor, Object[] args) {
        return this.getCurrentRuntime().call(func, actor, args);
    }

    @Override
    public <T> RayActor<T> createActor(RayFunc actorFactoryFunc, Object[] args, ActorCreationOptions options) {
        return this.getCurrentRuntime().createActor(actorFactoryFunc, args, options);
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        return this.getCurrentRuntime().getRuntimeContext();
    }

    @Override
    public RayObject callPy(String moduleName, String functionName, Object[] args, CallOptions options) {
        return this.getCurrentRuntime().callPy(moduleName, functionName, args, options);
    }

    @Override
    public RayObject callPy(RayPyActor pyActor, String functionName, Object[] args) {
        return this.getCurrentRuntime().callPy(pyActor, functionName, args);
    }

    @Override
    public RayPyActor createPyActor(String moduleName, String className, Object[] args, ActorCreationOptions options) {
        return this.getCurrentRuntime().createPyActor(moduleName, className, args, options);
    }

    @Override
    public Object getAsyncContext() {
        return this.getCurrentRuntime();
    }

    @Override
    public void setAsyncContext(Object asyncContext) {
        this.currentThreadRuntime.set((RayNativeRuntime)asyncContext);
    }

    @Override
    public Runnable wrapRunnable(Runnable runnable) {
        Object asyncContext = this.getAsyncContext();
        return () -> {
            this.setAsyncContext(asyncContext);
            runnable.run();
        };
    }

    @Override
    public Callable wrapCallable(Callable callable) {
        Object asyncContext = this.getAsyncContext();
        return () -> {
            this.setAsyncContext(asyncContext);
            return callable.call();
        };
    }
}

