/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.api.context;

import com.google.common.base.Preconditions;
import io.ray.api.Ray;
import io.ray.streaming.api.context.ClusterStarter;
import io.ray.streaming.api.stream.StreamSink;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.jobgraph.JobGraphBuilder;
import io.ray.streaming.schedule.JobScheduler;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingContext
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingContext.class);
    private transient AtomicInteger idGenerator = new AtomicInteger(0);
    private List<StreamSink> streamSinks = new ArrayList<StreamSink>();
    private Map<String, String> jobConfig = new HashMap<String, String>();
    private JobGraph jobGraph;

    private StreamingContext() {
    }

    public static StreamingContext buildContext() {
        return new StreamingContext();
    }

    public void execute(String jobName) {
        JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(this.streamSinks, jobName);
        this.jobGraph = jobGraphBuilder.build();
        this.jobGraph.printJobGraph();
        LOG.info("JobGraph digraph\n{}", (Object)this.jobGraph.generateDigraph());
        if (Ray.internal() == null) {
            if ("memory_channel".equalsIgnoreCase(this.jobConfig.get("channel_type"))) {
                Preconditions.checkArgument(!this.jobGraph.isCrossLanguageGraph());
                ClusterStarter.startCluster(false, true);
                LOG.info("Created local cluster for job {}.", (Object)jobName);
            } else {
                ClusterStarter.startCluster(this.jobGraph.isCrossLanguageGraph(), false);
                LOG.info("Created multi process cluster for job {}.", (Object)jobName);
            }
            Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
        } else {
            LOG.info("Reuse existing cluster.");
        }
        ServiceLoader<JobScheduler> serviceLoader = ServiceLoader.load(JobScheduler.class);
        Iterator<JobScheduler> iterator = serviceLoader.iterator();
        Preconditions.checkArgument(iterator.hasNext(), "No JobScheduler implementation has been provided.");
        JobScheduler jobSchedule = iterator.next();
        jobSchedule.schedule(this.jobGraph, this.jobConfig);
    }

    public int generateId() {
        return this.idGenerator.incrementAndGet();
    }

    public void addSink(StreamSink streamSink) {
        this.streamSinks.add(streamSink);
    }

    public List<StreamSink> getStreamSinks() {
        return this.streamSinks;
    }

    public void withConfig(Map<String, String> jobConfig) {
        this.jobConfig = jobConfig;
    }

    public void stop() {
        if (Ray.internal() != null) {
            ClusterStarter.stopCluster(this.jobGraph.isCrossLanguageGraph());
        }
    }
}

