/*
 * Decompiled with CFR 0.152.
 */
package org.ray.streaming.runtime.schedule;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.ray.api.BaseActor;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayPyActor;
import org.ray.api.function.PyActorMethod;
import org.ray.streaming.api.Language;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
import org.ray.streaming.runtime.core.graph.ExecutionTask;
import org.ray.streaming.runtime.generated.RemoteCall;
import org.ray.streaming.runtime.python.GraphPbBuilder;
import org.ray.streaming.runtime.schedule.TaskAssigner;
import org.ray.streaming.runtime.schedule.TaskAssignerImpl;
import org.ray.streaming.runtime.worker.JobWorker;
import org.ray.streaming.runtime.worker.context.WorkerContext;
import org.ray.streaming.schedule.JobScheduler;

public class JobSchedulerImpl
implements JobScheduler {
    private JobGraph jobGraph;
    private Map<String, String> jobConfig;
    private TaskAssigner taskAssigner = new TaskAssignerImpl();

    @Override
    public void schedule(JobGraph jobGraph, Map<String, String> jobConfig) {
        this.jobConfig = jobConfig;
        this.jobGraph = jobGraph;
        if (Ray.internal() == null) {
            System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
            Ray.init();
        }
        ExecutionGraph executionGraph = this.taskAssigner.assign(this.jobGraph);
        List<ExecutionNode> executionNodes = executionGraph.getExecutionNodeList();
        boolean hasPythonNode = executionNodes.stream().allMatch(node -> node.getLanguage() == Language.PYTHON);
        RemoteCall.ExecutionGraph executionGraphPb = null;
        if (hasPythonNode) {
            executionGraphPb = new GraphPbBuilder().buildExecutionGraphPb(executionGraph);
        }
        ArrayList waits = new ArrayList();
        for (ExecutionNode executionNode : executionNodes) {
            List<ExecutionTask> executionTasks = executionNode.getExecutionTasks();
            block5: for (ExecutionTask executionTask : executionTasks) {
                int taskId = executionTask.getTaskId();
                BaseActor worker = executionTask.getWorker();
                switch (executionNode.getLanguage()) {
                    case JAVA: {
                        RayActor jobWorker = (RayActor)worker;
                        waits.add(jobWorker.call(JobWorker::init, new WorkerContext(taskId, executionGraph, jobConfig)));
                        continue block5;
                    }
                    case PYTHON: {
                        byte[] workerContextBytes = this.buildPythonWorkerContext(taskId, executionGraphPb, jobConfig);
                        waits.add(((RayPyActor)worker).call(new PyActorMethod<Object>("init", Object.class), workerContextBytes));
                        continue block5;
                    }
                }
                throw new UnsupportedOperationException("Unsupported language " + (Object)((Object)executionNode.getLanguage()));
            }
        }
        Ray.wait(waits);
    }

    private byte[] buildPythonWorkerContext(int taskId, RemoteCall.ExecutionGraph executionGraphPb, Map<String, String> jobConfig) {
        return RemoteCall.WorkerContext.newBuilder().setTaskId(taskId).putAllConf(jobConfig).setGraph(executionGraphPb).build().toByteArray();
    }
}

