/*
 * Decompiled with CFR 0.152.
 */
package io.ray.runtime.runner;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.util.BinaryFileUtil;
import io.ray.runtime.util.ResourceUtil;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

public class RunManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(RunManager.class);
    private static final String WORKER_CLASS = "io.ray.runtime.runner.worker.DefaultWorker";
    private static final String SESSION_LATEST = "session_latest";
    private RayConfig rayConfig;
    private List<Pair<String, Process>> processes;
    private static final int KILL_PROCESS_WAIT_TIMEOUT_SECONDS = 1;

    public RunManager(RayConfig rayConfig) {
        this.rayConfig = rayConfig;
        this.processes = new ArrayList<Pair<String, Process>>();
        this.createTempDirs();
    }

    public void cleanup() {
        for (int i = this.processes.size() - 1; i >= 0; --i) {
            Pair<String, Process> pair = this.processes.get(i);
            this.terminateProcess(pair.getLeft(), pair.getRight());
        }
    }

    public void terminateProcess(String name, Process p) {
        int numAttempts = 0;
        while (p.isAlive()) {
            if (numAttempts == 0) {
                LOGGER.debug("Terminating process {}.", (Object)name);
                p.destroy();
            } else {
                LOGGER.debug("Terminating process {} forcibly.", (Object)name);
                p.destroyForcibly();
            }
            try {
                p.waitFor(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOGGER.warn("Got InterruptedException while waiting for process {} to be terminated.", (Object)name);
            }
            ++numAttempts;
        }
        LOGGER.debug("Process {} is now terminated.", (Object)name);
    }

    public List<Process> getProcesses(String name) {
        return this.processes.stream().filter(pair -> ((String)pair.getLeft()).equals(name)).map(Pair::getRight).collect(Collectors.toList());
    }

    private void createTempDirs() {
        try {
            FileUtils.forceMkdir(new File(this.rayConfig.logDir));
            FileUtils.forceMkdir(new File(this.rayConfig.rayletSocketName).getParentFile());
            FileUtils.forceMkdir(new File(this.rayConfig.objectStoreSocketName).getParentFile());
            String parentOfSessionDir = new File(this.rayConfig.sessionDir).getParent();
            File sessionLatest = new File(String.format("%s/%s", parentOfSessionDir, SESSION_LATEST));
            if (sessionLatest.exists()) {
                sessionLatest.delete();
            }
            Files.createSymbolicLink(Paths.get(sessionLatest.getAbsolutePath(), new String[0]), Paths.get(this.rayConfig.sessionDir, new String[0]), new FileAttribute[0]);
        }
        catch (IOException e) {
            LOGGER.error("Couldn't create temp directories.", e);
            throw new RuntimeException(e);
        }
    }

    private Pair<File, File> getLogFiles(String logDir, String processName) {
        int suffixIndex = 0;
        while (true) {
            String suffix = suffixIndex == 0 ? "" : "." + suffixIndex;
            File stdout = new File(String.format("%s/%s%s.out", logDir, suffix, processName));
            File stderr = new File(String.format("%s/%s%s.err", logDir, suffix, processName));
            if (!stdout.exists() && !stderr.exists()) {
                return ImmutablePair.of(stdout, stderr);
            }
            ++suffixIndex;
        }
    }

    private void startProcess(List<String> command, Map<String, String> env, String name) {
        Process p;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting process {} with command: {}", (Object)name, (Object)Joiner.on(" ").join(command));
        }
        ProcessBuilder builder = new ProcessBuilder(command);
        String stdout = "";
        String stderr = "";
        Pair<File, File> logFiles = this.getLogFiles(this.rayConfig.logDir, name);
        builder.redirectOutput(logFiles.getLeft());
        builder.redirectError(logFiles.getRight());
        if (env != null && !env.isEmpty()) {
            builder.environment().putAll(env);
        }
        try {
            p = builder.start();
        }
        catch (IOException e) {
            LOGGER.error("Failed to start process " + name, e);
            throw new RuntimeException("Failed to start process " + name, e);
        }
        try {
            TimeUnit.MILLISECONDS.sleep(1000L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (!p.isAlive()) {
            String message = String.format("Failed to start %s. Exit code: %d.", name, p.exitValue());
            message = message + String.format(" Logs are redirected to %s and %s.", stdout, stderr);
            throw new RuntimeException(message);
        }
        this.processes.add(Pair.of(name, p));
        if (LOGGER.isDebugEnabled()) {
            String message = String.format("%s process started.", name);
            message = message + String.format(" Logs are redirected to %s and %s.", stdout, stderr);
            LOGGER.debug(message);
        }
    }

    public void startRayProcesses(boolean isHead) {
        LOGGER.debug("Starting ray processes @ {}.", (Object)this.rayConfig.nodeIp);
        try {
            if (isHead) {
                this.startGcs();
            }
            this.startObjectStore();
            this.startRaylet();
            LOGGER.info("All processes started @ {}.", (Object)this.rayConfig.nodeIp);
        }
        catch (Exception e) {
            this.cleanup();
            LOGGER.error("Failed to start ray processes.", e);
            throw new RuntimeException("Failed to start ray processes.", e);
        }
    }

    private void startGcs() {
        String primary = this.startRedisInstance(this.rayConfig.nodeIp, this.rayConfig.headRedisPort, this.rayConfig.headRedisPassword, null);
        this.rayConfig.setRedisAddress(primary);
        try (Jedis client = new Jedis("127.0.0.1", this.rayConfig.headRedisPort);){
            if (!Strings.isNullOrEmpty(this.rayConfig.headRedisPassword)) {
                client.auth(this.rayConfig.headRedisPassword);
            }
            client.set("UseRaylet", "1");
            client.set("JobCounter", "0");
            client.set("NumRedisShards", Integer.toString(this.rayConfig.numberRedisShards));
            for (int i = 0; i < this.rayConfig.numberRedisShards; ++i) {
                String shard = this.startRedisInstance(this.rayConfig.nodeIp, this.rayConfig.redisShardPorts[i], this.rayConfig.headRedisPassword, i);
                client.rpush("RedisShards", shard);
            }
        }
        if (this.rayConfig.gcsServiceEnabled) {
            String redisPasswordOption = "";
            if (!Strings.isNullOrEmpty(this.rayConfig.headRedisPassword)) {
                redisPasswordOption = this.rayConfig.headRedisPassword;
            }
            File gcsServerFile = BinaryFileUtil.getFile(this.rayConfig.sessionDir, "gcs_server");
            Preconditions.checkState(gcsServerFile.setExecutable(true));
            ImmutableList<String> command = ImmutableList.of(gcsServerFile.getAbsolutePath(), String.format("--redis_address=%s", this.rayConfig.getRedisIp()), String.format("--redis_port=%d", this.rayConfig.getRedisPort()), String.format("--config_list=%s", this.rayConfig.rayletConfigParameters.entrySet().stream().map(entry -> (String)entry.getKey() + "," + (String)entry.getValue()).collect(Collectors.joining(","))), String.format("--redis_password=%s", redisPasswordOption));
            this.startProcess(command, null, "gcs_server");
        }
    }

    private String startRedisInstance(String ip, int port, String password, Integer shard) {
        File redisServerFile = BinaryFileUtil.getFile(this.rayConfig.sessionDir, "redis-server");
        Preconditions.checkState(redisServerFile.setExecutable(true));
        ArrayList<String> command = Lists.newArrayList(redisServerFile.getAbsolutePath(), "--protected-mode", "no", "--port", String.valueOf(port), "--loglevel", "warning", "--loadmodule", BinaryFileUtil.getFile(this.rayConfig.sessionDir, "libray_redis_module.so").getAbsolutePath());
        if (!Strings.isNullOrEmpty(password)) {
            command.add("--requirepass ");
            command.add(password);
        }
        String name = shard == null ? "redis" : "redis-shard_" + shard;
        this.startProcess(command, null, name);
        try (Jedis client = new Jedis("127.0.0.1", port);){
            if (!Strings.isNullOrEmpty(password)) {
                client.auth(password);
            }
            client.configSet("notify-keyspace-events", "Kl");
            client.set("redis_start_time", LocalDateTime.now().toString());
        }
        return ip + ":" + port;
    }

    private void startRaylet() throws IOException {
        int hardwareConcurrency = Runtime.getRuntime().availableProcessors();
        int maximumStartupConcurrency = Math.max(1, Math.min(this.rayConfig.resources.getOrDefault("CPU", 0.0).intValue(), hardwareConcurrency));
        String redisPasswordOption = "";
        if (!Strings.isNullOrEmpty(this.rayConfig.headRedisPassword)) {
            redisPasswordOption = this.rayConfig.headRedisPassword;
        }
        File rayletFile = BinaryFileUtil.getFile(this.rayConfig.sessionDir, "raylet");
        Preconditions.checkState(rayletFile.setExecutable(true));
        ImmutableList<String> command = ImmutableList.of(rayletFile.getAbsolutePath(), String.format("--raylet_socket_name=%s", this.rayConfig.rayletSocketName), String.format("--store_socket_name=%s", this.rayConfig.objectStoreSocketName), String.format("--object_manager_port=%d", 0), String.format("--node_manager_port=%d", this.rayConfig.getNodeManagerPort()), String.format("--node_ip_address=%s", this.rayConfig.nodeIp), String.format("--redis_address=%s", this.rayConfig.getRedisIp()), String.format("--redis_port=%d", this.rayConfig.getRedisPort()), String.format("--num_initial_workers=%d", 0), String.format("--maximum_startup_concurrency=%d", maximumStartupConcurrency), String.format("--static_resource_list=%s", ResourceUtil.getResourcesStringFromMap(this.rayConfig.resources)), String.format("--config_list=%s", this.rayConfig.rayletConfigParameters.entrySet().stream().map(entry -> (String)entry.getKey() + "," + (String)entry.getValue()).collect(Collectors.joining(","))), new String[]{String.format("--python_worker_command=%s", this.buildPythonWorkerCommand()), String.format("--java_worker_command=%s", this.buildWorkerCommand()), String.format("--redis_password=%s", redisPasswordOption)});
        this.startProcess(command, null, "raylet");
    }

    private String concatPath(Stream<String> stream) {
        return stream.filter(s2 -> !s2.contains(" ")).collect(Collectors.joining(":"));
    }

    private String buildWorkerCommand() throws IOException {
        ArrayList<String> cmd = new ArrayList<String>();
        cmd.add("java");
        cmd.add("-classpath");
        String classpath = this.concatPath(Stream.concat(this.rayConfig.classpath.stream(), Stream.of(System.getProperty("java.class.path").split(":"))));
        cmd.add(classpath);
        File workerConfigFile = new File(this.rayConfig.sessionDir + "/java_worker.conf");
        FileUtils.write(workerConfigFile, (CharSequence)this.rayConfig.render(), Charset.defaultCharset());
        cmd.add("-Dray.config-file=" + workerConfigFile.getAbsolutePath());
        cmd.add("RAY_WORKER_RAYLET_CONFIG_PLACEHOLDER");
        cmd.addAll(this.rayConfig.jvmParameters);
        cmd.add("RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER_0");
        cmd.add(WORKER_CLASS);
        String command = Joiner.on(" ").join(cmd);
        LOGGER.debug("Worker command is: {}", (Object)command);
        return command;
    }

    private void startObjectStore() {
        File objectStoreFile = BinaryFileUtil.getFile(this.rayConfig.sessionDir, "plasma_store_server");
        Preconditions.checkState(objectStoreFile.setExecutable(true));
        ImmutableList<String> command = ImmutableList.of(objectStoreFile.getAbsolutePath(), "-s", this.rayConfig.objectStoreSocketName, "-m", this.rayConfig.objectStoreSize.toString());
        this.startProcess(command, null, "plasma_store");
    }

    private String buildPythonWorkerCommand() {
        if (this.rayConfig.pythonWorkerCommand == null) {
            return "";
        }
        ArrayList<String> cmd = new ArrayList<String>();
        cmd.add(this.rayConfig.pythonWorkerCommand);
        cmd.add("--node-ip-address=" + this.rayConfig.nodeIp);
        cmd.add("--object-store-name=" + this.rayConfig.objectStoreSocketName);
        cmd.add("--raylet-name=" + this.rayConfig.rayletSocketName);
        cmd.add("--redis-address=" + this.rayConfig.getRedisAddress());
        String command = cmd.stream().collect(Collectors.joining(" "));
        LOGGER.debug("python worker command: {}", (Object)command);
        return command;
    }
}

