/*
 * Decompiled with CFR 0.152.
 */
package io.ray.runtime.utils.parallelactor;

import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.concurrencygroup.ConcurrencyGroup;
import io.ray.api.concurrencygroup.ConcurrencyGroupBuilder;
import io.ray.api.function.RayFunc;
import io.ray.api.function.RayFuncR;
import io.ray.api.parallelactor.ParallelActorContext;
import io.ray.api.parallelactor.ParallelActorHandle;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.functionmanager.FunctionManager;
import io.ray.runtime.functionmanager.JavaFunctionDescriptor;
import io.ray.runtime.utils.parallelactor.ParallelActorExecutorImpl;
import io.ray.runtime.utils.parallelactor.ParallelActorHandleImpl;

public class ParallelActorContextImpl
implements ParallelActorContext {
    @Override
    public <A> ParallelActorHandle<A> createParallelActorExecutor(int parallelism, RayFuncR<A> ctorFunc) {
        ConcurrencyGroup[] concurrencyGroups = new ConcurrencyGroup[parallelism];
        for (int i = 0; i < parallelism; ++i) {
            concurrencyGroups[i] = new ConcurrencyGroupBuilder().setName(String.format("PARALLEL_INSTANCE_%d", i)).setMaxConcurrency(1).build();
        }
        FunctionManager functionManager = ((AbstractRayRuntime)Ray.internal()).getFunctionManager();
        JavaFunctionDescriptor functionDescriptor = functionManager.getFunction(ctorFunc).getFunctionDescriptor();
        ActorHandle<ParallelActorExecutorImpl> parallelExecutorHandle = Ray.actor(ParallelActorExecutorImpl::new, Integer.valueOf(parallelism), functionDescriptor).setConcurrencyGroups(concurrencyGroups).remote();
        return new ParallelActorHandleImpl(parallelism, parallelExecutorHandle);
    }

    @Override
    public <A, R> ObjectRef<R> submitTask(ParallelActorHandle<A> parallelActorHandle, int instanceId, RayFunc func, Object[] args) {
        ActorHandle<ParallelActorExecutorImpl> parallelExecutor = ((ParallelActorHandleImpl)parallelActorHandle).getExecutor();
        FunctionManager functionManager = ((AbstractRayRuntime)Ray.internal()).getFunctionManager();
        JavaFunctionDescriptor functionDescriptor = functionManager.getFunction(func).getFunctionDescriptor();
        ObjectRef<Object> ret = parallelExecutor.task(ParallelActorExecutorImpl::execute, Integer.valueOf(instanceId), functionDescriptor, args).setConcurrencyGroup(String.format("PARALLEL_INSTANCE_%d", instanceId)).remote();
        return ret;
    }
}

