/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.master.scheduler.controller;

import io.ray.api.Ray;
import io.ray.api.RayActor;
import io.ray.api.WaitResult;
import io.ray.api.options.ActorCreationOptions;
import io.ray.streaming.api.Language;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.rpc.RemoteCallWorker;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerLifecycleController {
    private static final Logger LOG = LoggerFactory.getLogger(WorkerLifecycleController.class);

    public boolean createWorkers(List<ExecutionVertex> executionVertices) {
        return this.asyncBatchExecute(this::createWorker, executionVertices);
    }

    private boolean createWorker(ExecutionVertex executionVertex) {
        LOG.info("Start to create worker actor for vertex: {} with resource: {}.", (Object)executionVertex.getVertexName(), (Object)executionVertex.getResources());
        Language language = executionVertex.getLanguage();
        ActorCreationOptions options = new ActorCreationOptions.Builder().setResources(executionVertex.getResources()).setMaxRestarts(-1).createActorCreationOptions();
        RayActor<JobWorker> actor = null;
        if (null == actor) {
            LOG.error("Create worker actor failed.");
            return false;
        }
        executionVertex.setWorkerActor(actor);
        LOG.info("Worker actor created, actor: {}, vertex: {}.", (Object)executionVertex.getWorkerActorId(), (Object)executionVertex.getVertexName());
        return true;
    }

    public boolean initWorkers(Map<ExecutionVertex, JobWorkerContext> vertexToContextMap, int timeout) {
        LOG.info("Begin initiating workers: {}.", (Object)vertexToContextMap);
        long startTime = System.currentTimeMillis();
        HashMap rayObjects = new HashMap();
        vertexToContextMap.entrySet().forEach(entry -> {
            ExecutionVertex vertex = (ExecutionVertex)entry.getKey();
            rayObjects.put(RemoteCallWorker.initWorker(vertex.getWorkerActor(), (JobWorkerContext)entry.getValue()), vertex.getWorkerActorId());
        });
        ArrayList rayObjectList = new ArrayList(rayObjects.keySet());
        LOG.info("Waiting for workers' initialization.");
        WaitResult result = Ray.wait(rayObjectList, rayObjectList.size(), timeout);
        if (result.getReady().size() != rayObjectList.size()) {
            LOG.error("Initializing workers timeout[{} ms].", (Object)timeout);
            return false;
        }
        LOG.info("Finished waiting workers' initialization.");
        LOG.info("Workers initialized. Cost {} ms.", (Object)(System.currentTimeMillis() - startTime));
        return true;
    }

    public boolean startWorkers(ExecutionGraph executionGraph, int timeout) {
        LOG.info("Begin starting workers.");
        long startTime = System.currentTimeMillis();
        ArrayList rayObjects = new ArrayList();
        executionGraph.getSourceActors().forEach(actor -> rayObjects.add(RemoteCallWorker.startWorker(actor)));
        executionGraph.getNonSourceActors().forEach(actor -> rayObjects.add(RemoteCallWorker.startWorker(actor)));
        WaitResult result = Ray.wait(rayObjects, rayObjects.size(), timeout);
        if (result.getReady().size() != rayObjects.size()) {
            LOG.error("Starting workers timeout[{} ms].", (Object)timeout);
            return false;
        }
        LOG.info("Workers started. Cost {} ms.", (Object)(System.currentTimeMillis() - startTime));
        return true;
    }

    public boolean destroyWorkers(List<ExecutionVertex> executionVertices) {
        return this.asyncBatchExecute(this::destroyWorker, executionVertices);
    }

    private boolean destroyWorker(ExecutionVertex executionVertex) {
        RayActor<JobWorker> rayActor = executionVertex.getWorkerActor();
        LOG.info("Begin destroying worker[vertex={}, actor={}].", (Object)executionVertex.getVertexName(), (Object)rayActor.getId());
        boolean destroyResult = RemoteCallWorker.shutdownWithoutReconstruction(rayActor);
        if (!destroyResult) {
            LOG.error("Failed to destroy JobWorker[{}]'s actor: {}.", (Object)executionVertex.getVertexName(), (Object)rayActor);
            return false;
        }
        LOG.info("Worker destroyed, actor: {}.", (Object)rayActor);
        return true;
    }

    private boolean asyncBatchExecute(Function<ExecutionVertex, Boolean> operation, List<ExecutionVertex> executionVertices) {
        Object asyncContext = Ray.getAsyncContext();
        List futureResults = executionVertices.stream().map(vertex -> CompletableFuture.supplyAsync(() -> {
            Ray.setAsyncContext(asyncContext);
            return (Boolean)operation.apply((ExecutionVertex)vertex);
        })).collect(Collectors.toList());
        List succeeded = futureResults.stream().map(CompletableFuture::join).collect(Collectors.toList());
        if (succeeded.stream().anyMatch(x -> x == false)) {
            LOG.error("Not all futures return true, check ResourceManager'log the detail.");
            return false;
        }
        return true;
    }
}

