/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.schedule;

import io.ray.api.BaseActor;
import io.ray.api.Ray;
import io.ray.api.RayActor;
import io.ray.api.RayPyActor;
import io.ray.api.function.PyActorClass;
import io.ray.streaming.jobgraph.JobEdge;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.jobgraph.JobVertex;
import io.ray.streaming.runtime.core.graph.ExecutionEdge;
import io.ray.streaming.runtime.core.graph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.ExecutionNode;
import io.ray.streaming.runtime.core.graph.ExecutionTask;
import io.ray.streaming.runtime.schedule.TaskAssigner;
import io.ray.streaming.runtime.worker.JobWorker;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskAssignerImpl
implements TaskAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(TaskAssignerImpl.class);

    @Override
    public ExecutionGraph assign(JobGraph jobGraph) {
        List<JobVertex> jobVertices = jobGraph.getJobVertexList();
        List<JobEdge> jobEdges = jobGraph.getJobEdgeList();
        int taskId = 0;
        HashMap<Integer, ExecutionNode> idToExecutionNode = new HashMap<Integer, ExecutionNode>();
        for (JobVertex jobVertex : jobVertices) {
            ExecutionNode executionNode = new ExecutionNode(jobVertex.getVertexId(), jobVertex.getParallelism(), jobVertex.getConfig());
            executionNode.setNodeType(jobVertex.getVertexType());
            ArrayList<ExecutionTask> vertexTasks = new ArrayList<ExecutionTask>();
            for (int taskIndex = 0; taskIndex < jobVertex.getParallelism(); ++taskIndex) {
                vertexTasks.add(new ExecutionTask(taskId, taskIndex, this.createWorker(jobVertex)));
                ++taskId;
            }
            executionNode.setExecutionTasks(vertexTasks);
            executionNode.setStreamOperator(jobVertex.getStreamOperator());
            idToExecutionNode.put(executionNode.getNodeId(), executionNode);
        }
        for (JobEdge jobEdge : jobEdges) {
            int srcNodeId = jobEdge.getSrcVertexId();
            int targetNodeId = jobEdge.getTargetVertexId();
            ExecutionEdge executionEdge = new ExecutionEdge(srcNodeId, targetNodeId, jobEdge.getPartition());
            ((ExecutionNode)idToExecutionNode.get(srcNodeId)).addOutputEdge(executionEdge);
            ((ExecutionNode)idToExecutionNode.get(targetNodeId)).addInputEdge(executionEdge);
        }
        ArrayList<ExecutionNode> executionNodes = new ArrayList<ExecutionNode>(idToExecutionNode.values());
        return new ExecutionGraph(executionNodes);
    }

    private BaseActor createWorker(JobVertex jobVertex) {
        switch (jobVertex.getLanguage()) {
            case PYTHON: {
                RayPyActor worker = Ray.createActor(new PyActorClass("ray.streaming.runtime.worker", "JobWorker"));
                LOG.info("Created python worker {}", (Object)worker);
                return worker;
            }
            case JAVA: {
                RayActor<JobWorker> worker = Ray.createActor(JobWorker::new);
                LOG.info("Created java worker {}", (Object)worker);
                return worker;
            }
        }
        throw new UnsupportedOperationException("Unsupported language " + (Object)((Object)jobVertex.getLanguage()));
    }
}

