/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.master.graphmanager;

import io.ray.streaming.jobgraph.JobEdge;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.jobgraph.JobVertex;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobEdge;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.master.JobRuntimeContext;
import io.ray.streaming.runtime.master.graphmanager.GraphManager;
import java.util.LinkedHashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GraphManagerImpl
implements GraphManager {
    private static final Logger LOG = LoggerFactory.getLogger(GraphManagerImpl.class);
    protected final JobRuntimeContext runtimeContext;

    public GraphManagerImpl(JobRuntimeContext runtimeContext) {
        this.runtimeContext = runtimeContext;
    }

    @Override
    public ExecutionGraph buildExecutionGraph(JobGraph jobGraph) {
        LOG.info("Begin build execution graph with job graph {}.", (Object)jobGraph);
        ExecutionGraph executionGraph = this.setupStructure(jobGraph);
        int maxParallelism = jobGraph.getJobVertexList().stream().map(JobVertex::getParallelism).max(Integer::compareTo).get();
        executionGraph.setMaxParallelism(maxParallelism);
        executionGraph.setJobConfig(jobGraph.getJobConfig());
        LOG.info("Build execution graph success.");
        return executionGraph;
    }

    private ExecutionGraph setupStructure(JobGraph jobGraph) {
        ExecutionGraph executionGraph = new ExecutionGraph(jobGraph.getJobName());
        Map<String, String> jobConfig = jobGraph.getJobConfig();
        LinkedHashMap<Integer, ExecutionJobVertex> exeJobVertexMap = new LinkedHashMap<Integer, ExecutionJobVertex>();
        long buildTime = executionGraph.getBuildTime();
        for (JobVertex jobVertex : jobGraph.getJobVertexList()) {
            int jobVertexId = jobVertex.getVertexId();
            exeJobVertexMap.put(jobVertexId, new ExecutionJobVertex(jobVertex, jobConfig, executionGraph.getExecutionVertexIdGenerator()));
        }
        jobGraph.getJobEdgeList().stream().forEach(jobEdge -> {
            ExecutionJobVertex source = (ExecutionJobVertex)exeJobVertexMap.get(jobEdge.getSrcVertexId());
            ExecutionJobVertex target = (ExecutionJobVertex)exeJobVertexMap.get(jobEdge.getTargetVertexId());
            ExecutionJobEdge executionJobEdge = new ExecutionJobEdge(source, target, (JobEdge)jobEdge);
            source.getOutputEdges().add(executionJobEdge);
            target.getInputEdges().add(executionJobEdge);
            source.getExecutionVertices().stream().forEach(vertex -> target.getExecutionVertices().stream().forEach(outputVertex -> {
                ExecutionEdge executionEdge = new ExecutionEdge((ExecutionVertex)vertex, (ExecutionVertex)outputVertex, executionJobEdge);
                vertex.getOutputEdges().add(executionEdge);
                outputVertex.getInputEdges().add(executionEdge);
            }));
        });
        executionGraph.setExecutionJobVertexMap(exeJobVertexMap);
        return executionGraph;
    }

    @Override
    public JobGraph getJobGraph() {
        return this.runtimeContext.getJobGraph();
    }

    @Override
    public ExecutionGraph getExecutionGraph() {
        return this.runtimeContext.getExecutionGraph();
    }
}

