/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.api.stream;

import com.google.common.base.Preconditions;
import io.ray.streaming.api.Language;
import io.ray.streaming.api.context.StreamingContext;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.api.partition.impl.RoundRobinPartition;
import io.ray.streaming.operator.Operator;
import io.ray.streaming.operator.StreamOperator;
import io.ray.streaming.python.PythonPartition;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

public abstract class Stream<S extends Stream<S, T>, T>
implements Serializable {
    private final int id;
    private final StreamingContext streamingContext;
    private final Stream inputStream;
    private final StreamOperator operator;
    private int parallelism = 1;
    private Map<String, String> config = new HashMap<String, String>();
    private Partition<T> partition;
    private Stream originalStream;

    public Stream(StreamingContext streamingContext, StreamOperator streamOperator) {
        this(streamingContext, null, streamOperator, Stream.selectPartition(streamOperator));
    }

    public Stream(StreamingContext streamingContext, StreamOperator streamOperator, Partition<T> partition) {
        this(streamingContext, null, streamOperator, partition);
    }

    public Stream(Stream inputStream, StreamOperator streamOperator) {
        this(inputStream.getStreamingContext(), inputStream, streamOperator, Stream.selectPartition(streamOperator));
    }

    public Stream(Stream inputStream, StreamOperator streamOperator, Partition<T> partition) {
        this(inputStream.getStreamingContext(), inputStream, streamOperator, partition);
    }

    protected Stream(StreamingContext streamingContext, Stream inputStream, StreamOperator streamOperator, Partition<T> partition) {
        this.streamingContext = streamingContext;
        this.inputStream = inputStream;
        this.operator = streamOperator;
        this.partition = partition;
        this.id = streamingContext.generateId();
        if (inputStream != null) {
            this.parallelism = inputStream.getParallelism();
        }
    }

    protected Stream(Stream originalStream) {
        this.originalStream = originalStream;
        this.id = originalStream.getId();
        this.streamingContext = originalStream.getStreamingContext();
        this.inputStream = originalStream.getInputStream();
        this.operator = originalStream.getOperator();
    }

    private static <T> Partition<T> selectPartition(Operator operator) {
        switch (operator.getLanguage()) {
            case PYTHON: {
                return PythonPartition.RoundRobinPartition;
            }
            case JAVA: {
                return new RoundRobinPartition();
            }
        }
        throw new UnsupportedOperationException("Unsupported language " + (Object)((Object)operator.getLanguage()));
    }

    public int getId() {
        return this.id;
    }

    public StreamingContext getStreamingContext() {
        return this.streamingContext;
    }

    public Stream getInputStream() {
        return this.inputStream;
    }

    public StreamOperator getOperator() {
        return this.operator;
    }

    private S self() {
        return (S)this;
    }

    public int getParallelism() {
        return this.originalStream != null ? this.originalStream.getParallelism() : this.parallelism;
    }

    public S setParallelism(int parallelism) {
        if (this.originalStream != null) {
            this.originalStream.setParallelism(parallelism);
        } else {
            this.parallelism = parallelism;
        }
        return this.self();
    }

    public Partition<T> getPartition() {
        return this.originalStream != null ? this.originalStream.getPartition() : this.partition;
    }

    protected S setPartition(Partition<T> partition) {
        if (this.originalStream != null) {
            this.originalStream.setPartition(partition);
        } else {
            this.partition = partition;
        }
        return this.self();
    }

    public S withConfig(Map<String, String> config) {
        config.forEach(this::withConfig);
        return this.self();
    }

    public S withConfig(String key, String value) {
        if (this.isProxyStream()) {
            this.originalStream.withConfig(key, value);
        } else {
            this.config.put(key, value);
        }
        return this.self();
    }

    public Map<String, String> getConfig() {
        return this.isProxyStream() ? this.originalStream.getConfig() : this.config;
    }

    public boolean isProxyStream() {
        return this.originalStream != null;
    }

    public Stream getOriginalStream() {
        Preconditions.checkArgument(this.isProxyStream());
        return this.originalStream;
    }

    public abstract Language getLanguage();
}

