/*
 * Decompiled with CFR 0.152.
 */
package org.ray.runtime.runner;

import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.util.FileUtil;
import org.ray.runtime.util.ResourceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

public class RunManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(RunManager.class);
    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("Y-M-d_H-m-s");
    private static final String WORKER_CLASS = "org.ray.runtime.runner.worker.DefaultWorker";
    private RayConfig rayConfig;
    private Random random;
    private List<Pair<String, Process>> processes;
    private static final int KILL_PROCESS_WAIT_TIMEOUT_SECONDS = 1;

    public RunManager(RayConfig rayConfig) {
        this.rayConfig = rayConfig;
        this.processes = new ArrayList<Pair<String, Process>>();
        this.random = new Random();
    }

    public void cleanup() {
        for (int i = this.processes.size() - 1; i >= 0; --i) {
            Pair<String, Process> pair = this.processes.get(i);
            this.terminateProcess(pair.getLeft(), pair.getRight());
        }
    }

    public void terminateProcess(String name, Process p) {
        int numAttempts = 0;
        while (p.isAlive()) {
            if (numAttempts == 0) {
                LOGGER.debug("Terminating process {}.", (Object)name);
                p.destroy();
            } else {
                LOGGER.debug("Terminating process {} forcibly.", (Object)name);
                p.destroyForcibly();
            }
            try {
                p.waitFor(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOGGER.warn("Got InterruptedException while waiting for process {} to be terminated.", (Object)name);
            }
            ++numAttempts;
        }
        LOGGER.info("Process {} is now terminated.", (Object)name);
    }

    public List<Process> getProcesses(String name) {
        return this.processes.stream().filter(pair -> ((String)pair.getLeft()).equals(name)).map(Pair::getRight).collect(Collectors.toList());
    }

    private void createTempDirs() {
        try {
            FileUtils.forceMkdir(new File(this.rayConfig.logDir));
            FileUtils.forceMkdir(new File(this.rayConfig.rayletSocketName).getParentFile());
            FileUtils.forceMkdir(new File(this.rayConfig.objectStoreSocketName).getParentFile());
        }
        catch (IOException e) {
            LOGGER.error("Couldn't create temp directories.", e);
            throw new RuntimeException(e);
        }
    }

    private void startProcess(List<String> command, Map<String, String> env, String name) {
        Process p;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting process {} with command: {}", (Object)name, (Object)Joiner.on(" ").join(command));
        }
        ProcessBuilder builder = new ProcessBuilder(command);
        String stdout = "";
        String stderr = "";
        if (this.rayConfig.redirectOutput) {
            int logId = this.random.nextInt(10000);
            String date = DATE_TIME_FORMATTER.format(LocalDateTime.now());
            stdout = String.format("%s/%s-%s-%05d.out", this.rayConfig.logDir, name, date, logId);
            stderr = String.format("%s/%s-%s-%05d.err", this.rayConfig.logDir, name, date, logId);
            builder.redirectOutput(new File(stdout));
            builder.redirectError(new File(stderr));
        }
        if (env != null && !env.isEmpty()) {
            builder.environment().putAll(env);
        }
        try {
            p = builder.start();
        }
        catch (IOException e) {
            LOGGER.error("Failed to start process " + name, e);
            throw new RuntimeException("Failed to start process " + name, e);
        }
        try {
            TimeUnit.MILLISECONDS.sleep(200L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (!p.isAlive()) {
            throw new RuntimeException(String.format("Failed to start %s. Exit code: %d.", name, p.exitValue()));
        }
        this.processes.add(Pair.of(name, p));
        if (LOGGER.isInfoEnabled()) {
            String message = String.format("%s process started.", name);
            if (this.rayConfig.redirectOutput) {
                message = message + String.format(" Logs are redirected to %s and %s.", stdout, stderr);
            }
            LOGGER.info(message);
        }
    }

    public void startRayProcesses(boolean isHead) {
        LOGGER.info("Starting ray processes @ {}.", (Object)this.rayConfig.nodeIp);
        try {
            this.createTempDirs();
            if (isHead) {
                this.startRedisServer();
            }
            this.startObjectStore();
            this.startRaylet();
            LOGGER.info("All processes started @ {}.", (Object)this.rayConfig.nodeIp);
        }
        catch (Exception e) {
            this.cleanup();
            LOGGER.error("Failed to start ray processes.", e);
            throw new RuntimeException("Failed to start ray processes.", e);
        }
    }

    private void startRedisServer() {
        String primary = this.startRedisInstance(this.rayConfig.nodeIp, this.rayConfig.headRedisPort, this.rayConfig.headRedisPassword, null);
        this.rayConfig.setRedisAddress(primary);
        try (Jedis client = new Jedis("127.0.0.1", this.rayConfig.headRedisPort);){
            if (!Strings.isNullOrEmpty(this.rayConfig.headRedisPassword)) {
                client.auth(this.rayConfig.headRedisPassword);
            }
            client.set("UseRaylet", "1");
            client.set("JobCounter", "0");
            client.set("NumRedisShards", Integer.toString(this.rayConfig.numberRedisShards));
            for (int i = 0; i < this.rayConfig.numberRedisShards; ++i) {
                String shard = this.startRedisInstance(this.rayConfig.nodeIp, this.rayConfig.headRedisPort + i + 1, this.rayConfig.headRedisPassword, i);
                client.rpush("RedisShards", shard);
            }
        }
    }

    private String startRedisInstance(String ip, int port, String password, Integer shard) {
        try (FileUtil.TempFile redisServerFile = FileUtil.getTempFileFromResource("redis-server");
             FileUtil.TempFile redisModuleFile = FileUtil.getTempFileFromResource("libray_redis_module.so");){
            redisServerFile.getFile().setExecutable(true);
            ArrayList<String> command = Lists.newArrayList(redisServerFile.getFile().getAbsolutePath(), "--protected-mode", "no", "--port", String.valueOf(port), "--loglevel", "warning", "--loadmodule", redisModuleFile.getFile().getAbsolutePath());
            if (!Strings.isNullOrEmpty(password)) {
                command.add("--requirepass ");
                command.add(password);
            }
            String name = shard == null ? "redis" : "redis-" + shard;
            this.startProcess(command, null, name);
        }
        try (Jedis client = new Jedis("127.0.0.1", port);){
            if (!Strings.isNullOrEmpty(password)) {
                client.auth(password);
            }
            client.configSet("notify-keyspace-events", "Kl");
            client.set("redis_start_time", LocalDateTime.now().toString());
        }
        return ip + ":" + port;
    }

    private void startRaylet() {
        int hardwareConcurrency = Runtime.getRuntime().availableProcessors();
        int maximumStartupConcurrency = Math.max(1, Math.min(this.rayConfig.resources.getOrDefault("CPU", 0.0).intValue(), hardwareConcurrency));
        String redisPasswordOption = "";
        if (!Strings.isNullOrEmpty(this.rayConfig.headRedisPassword)) {
            redisPasswordOption = this.rayConfig.headRedisPassword;
        }
        try (FileUtil.TempFile rayletFile = FileUtil.getTempFileFromResource("raylet");){
            rayletFile.getFile().setExecutable(true);
            ImmutableList<String> command = ImmutableList.of(rayletFile.getFile().getAbsolutePath(), String.format("--raylet_socket_name=%s", this.rayConfig.rayletSocketName), String.format("--store_socket_name=%s", this.rayConfig.objectStoreSocketName), String.format("--object_manager_port=%d", 0), String.format("--node_manager_port=%d", this.rayConfig.getNodeManagerPort()), String.format("--node_ip_address=%s", this.rayConfig.nodeIp), String.format("--redis_address=%s", this.rayConfig.getRedisIp()), String.format("--redis_port=%d", this.rayConfig.getRedisPort()), String.format("--num_initial_workers=%d", 0), String.format("--maximum_startup_concurrency=%d", maximumStartupConcurrency), String.format("--static_resource_list=%s", ResourceUtil.getResourcesStringFromMap(this.rayConfig.resources)), String.format("--config_list=%s", String.join((CharSequence)",", this.rayConfig.rayletConfigParameters)), new String[]{String.format("--python_worker_command=%s", this.buildPythonWorkerCommand()), String.format("--java_worker_command=%s", this.buildWorkerCommandRaylet()), String.format("--redis_password=%s", redisPasswordOption)});
            this.startProcess(command, null, "raylet");
        }
    }

    private String concatPath(Stream<String> stream) {
        return stream.filter(s2 -> !s2.contains(" ")).collect(Collectors.joining(":"));
    }

    private String buildWorkerCommandRaylet() {
        ArrayList<String> cmd = new ArrayList<String>();
        cmd.add("java");
        cmd.add("-classpath");
        String classpath = this.concatPath(Stream.concat(this.rayConfig.classpath.stream(), Stream.of(System.getProperty("java.class.path").split(":"))));
        cmd.add(classpath);
        String libraryPath = this.concatPath(this.rayConfig.libraryPath.stream());
        cmd.add("-Djava.library.path=" + libraryPath);
        if (this.rayConfig.redirectOutput) {
            cmd.add("-Dray.logging.stdout=org.apache.log4j.varia.NullAppender");
            cmd.add("-Dray.logging.file=org.apache.log4j.FileAppender");
            int logId = this.random.nextInt(10000);
            String date = DATE_TIME_FORMATTER.format(LocalDateTime.now());
            String logFile = String.format("%s/worker-%s-%05d.out", this.rayConfig.logDir, date, logId);
            cmd.add("-Dray.logging.file.path=" + logFile);
        }
        if (!Strings.isNullOrEmpty(this.rayConfig.jobResourcePath)) {
            cmd.add("-Dray.job.resource-path=" + this.rayConfig.jobResourcePath);
        }
        cmd.add("-Dray.raylet.socket-name=" + this.rayConfig.rayletSocketName);
        cmd.add("-Dray.object-store.socket-name=" + this.rayConfig.objectStoreSocketName);
        cmd.add("-Dray.raylet.node-manager-port=" + this.rayConfig.getNodeManagerPort());
        cmd.add("-Dray.redis.address=" + this.rayConfig.getRedisAddress());
        if (!Strings.isNullOrEmpty(this.rayConfig.headRedisPassword)) {
            cmd.add("-Dray.redis.password=" + this.rayConfig.headRedisPassword);
        }
        cmd.add("-Dray.raylet.config.num_workers_per_process_java=RAY_WORKER_NUM_WORKERS_PLACEHOLDER");
        cmd.addAll(this.rayConfig.jvmParameters);
        cmd.add("RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER_0");
        cmd.add(WORKER_CLASS);
        String command = Joiner.on(" ").join(cmd);
        LOGGER.debug("Worker command is: {}", (Object)command);
        return command;
    }

    private void startObjectStore() {
        try (FileUtil.TempFile plasmaStoreFile = FileUtil.getTempFileFromResource("plasma_store_server");){
            plasmaStoreFile.getFile().setExecutable(true);
            ImmutableList<String> command = ImmutableList.of(plasmaStoreFile.getFile().getAbsolutePath(), "-s", this.rayConfig.objectStoreSocketName, "-m", this.rayConfig.objectStoreSize.toString());
            this.startProcess(command, null, "plasma_store");
        }
    }

    private String buildPythonWorkerCommand() {
        if (this.rayConfig.pythonWorkerCommand == null) {
            return "";
        }
        ArrayList<String> cmd = new ArrayList<String>();
        cmd.add(this.rayConfig.pythonWorkerCommand);
        cmd.add("--node-ip-address=" + this.rayConfig.nodeIp);
        cmd.add("--object-store-name=" + this.rayConfig.objectStoreSocketName);
        cmd.add("--raylet-name=" + this.rayConfig.rayletSocketName);
        cmd.add("--redis-address=" + this.rayConfig.getRedisAddress());
        String command = cmd.stream().collect(Collectors.joining(" "));
        LOGGER.debug("python worker command: {}", (Object)command);
        return command;
    }
}

