/*
 * Decompiled with CFR 0.152.
 */
package org.ray.streaming.runtime.schedule;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.jobgraph.JobVertex;
import org.ray.streaming.runtime.cluster.ResourceManager;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
import org.ray.streaming.runtime.core.graph.ExecutionTask;
import org.ray.streaming.runtime.schedule.TaskAssigner;
import org.ray.streaming.runtime.schedule.TaskAssignerImpl;
import org.ray.streaming.runtime.worker.JobWorker;
import org.ray.streaming.runtime.worker.context.WorkerContext;
import org.ray.streaming.schedule.JobScheduler;

public class JobSchedulerImpl
implements JobScheduler {
    private JobGraph jobGraph;
    private Map<String, String> jobConfig;
    private ResourceManager resourceManager = new ResourceManager();
    private TaskAssigner taskAssigner = new TaskAssignerImpl();

    @Override
    public void schedule(JobGraph jobGraph, Map<String, String> jobConfig) {
        this.jobConfig = jobConfig;
        this.jobGraph = jobGraph;
        System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
        Ray.init();
        List<RayActor<JobWorker>> workers = this.resourceManager.createWorkers(this.getPlanWorker());
        ExecutionGraph executionGraph = this.taskAssigner.assign(this.jobGraph, workers);
        List<ExecutionNode> executionNodes = executionGraph.getExecutionNodeList();
        ArrayList waits = new ArrayList();
        for (ExecutionNode executionNode : executionNodes) {
            List<ExecutionTask> executionTasks = executionNode.getExecutionTasks();
            for (ExecutionTask executionTask : executionTasks) {
                int taskId = executionTask.getTaskId();
                RayActor<JobWorker> streamWorker = executionTask.getWorker();
                waits.add(Ray.call(JobWorker::init, streamWorker, new WorkerContext(taskId, executionGraph, jobConfig)));
            }
        }
        Ray.wait(waits);
    }

    private int getPlanWorker() {
        List<JobVertex> jobVertexList = this.jobGraph.getJobVertexList();
        return jobVertexList.stream().map(JobVertex::getParallelism).reduce(0, Integer::sum);
    }
}

