/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.master.scheduler;

import com.google.common.collect.ImmutableList;
import io.ray.api.RayActor;
import io.ray.streaming.runtime.config.StreamingConfig;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.master.JobMaster;
import io.ray.streaming.runtime.master.graphmanager.GraphManager;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManager;
import io.ray.streaming.runtime.master.resourcemanager.ViewBuilder;
import io.ray.streaming.runtime.master.scheduler.JobScheduler;
import io.ray.streaming.runtime.master.scheduler.controller.WorkerLifecycleController;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobSchedulerImpl
implements JobScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
    private StreamingConfig jobConf;
    private final JobMaster jobMaster;
    private final ResourceManager resourceManager;
    private final GraphManager graphManager;
    private final WorkerLifecycleController workerLifecycleController;

    public JobSchedulerImpl(JobMaster jobMaster) {
        this.jobMaster = jobMaster;
        this.graphManager = jobMaster.getGraphManager();
        this.resourceManager = jobMaster.getResourceManager();
        this.workerLifecycleController = new WorkerLifecycleController();
        this.jobConf = jobMaster.getRuntimeContext().getConf();
        LOG.info("Scheduler initiated.");
    }

    @Override
    public boolean scheduleJob(ExecutionGraph executionGraph) {
        LOG.info("Begin scheduling. Job: {}.", (Object)executionGraph.getJobName());
        this.prepareResourceAndCreateWorker(executionGraph);
        this.initAndStart(executionGraph);
        return true;
    }

    protected void prepareResourceAndCreateWorker(ExecutionGraph executionGraph) {
        ImmutableList<Container> containers = this.resourceManager.getRegisteredContainers();
        this.resourceManager.assignResource(containers, executionGraph);
        LOG.info("Allocating map is: {}.", (Object)ViewBuilder.buildResourceAssignmentView(containers));
        this.createWorkers(executionGraph);
    }

    private void initAndStart(ExecutionGraph executionGraph) {
        Map<ExecutionVertex, JobWorkerContext> vertexToContextMap = this.buildWorkersContext(executionGraph);
        this.initWorkers(vertexToContextMap);
        this.initMaster();
        this.startWorkers(executionGraph);
    }

    public boolean createWorkers(ExecutionGraph executionGraph) {
        LOG.info("Begin creating workers.");
        long startTs = System.currentTimeMillis();
        boolean createResult = this.workerLifecycleController.createWorkers(executionGraph.getAllAddedExecutionVertices());
        if (createResult) {
            LOG.info("Finished creating workers. Cost {} ms.", (Object)(System.currentTimeMillis() - startTs));
            return true;
        }
        LOG.error("Failed to create workers. Cost {} ms.", (Object)(System.currentTimeMillis() - startTs));
        return false;
    }

    protected boolean initWorkers(Map<ExecutionVertex, JobWorkerContext> vertexToContextMap) {
        boolean result;
        try {
            result = this.workerLifecycleController.initWorkers(vertexToContextMap, this.jobConf.masterConfig.schedulerConfig.workerInitiationWaitTimeoutMs());
        }
        catch (Exception e) {
            LOG.error("Failed to initiate workers.", e);
            return false;
        }
        return result;
    }

    public boolean startWorkers(ExecutionGraph executionGraph) {
        boolean result;
        try {
            result = this.workerLifecycleController.startWorkers(executionGraph, this.jobConf.masterConfig.schedulerConfig.workerStartingWaitTimeoutMs());
        }
        catch (Exception e) {
            LOG.error("Failed to start workers.", e);
            return false;
        }
        return result;
    }

    protected Map<ExecutionVertex, JobWorkerContext> buildWorkersContext(ExecutionGraph executionGraph) {
        RayActor masterActor = this.jobMaster.getJobMasterActor();
        HashMap<ExecutionVertex, JobWorkerContext> needRegistryVertexToContextMap = new HashMap<ExecutionVertex, JobWorkerContext>();
        executionGraph.getAllExecutionVertices().forEach(vertex -> {
            JobWorkerContext ctx = this.buildJobWorkerContext((ExecutionVertex)vertex, masterActor);
            needRegistryVertexToContextMap.put((ExecutionVertex)vertex, ctx);
        });
        return needRegistryVertexToContextMap;
    }

    private JobWorkerContext buildJobWorkerContext(ExecutionVertex executionVertex, RayActor<JobMaster> masterActor) {
        JobWorkerContext ctx = new JobWorkerContext(executionVertex.getWorkerActorId(), masterActor, executionVertex);
        return ctx;
    }

    public boolean destroyWorkers(List<ExecutionVertex> executionVertices) {
        boolean result;
        try {
            result = this.workerLifecycleController.destroyWorkers(executionVertices);
        }
        catch (Exception e) {
            LOG.error("Failed to destroy workers.", e);
            return false;
        }
        return result;
    }

    private void initMaster() {
        this.jobMaster.init();
    }
}

