/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.worker.tasks;

import io.ray.api.BaseActor;
import io.ray.api.Ray;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.runtime.core.collector.OutputCollector;
import io.ray.streaming.runtime.core.graph.ExecutionEdge;
import io.ray.streaming.runtime.core.graph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.ExecutionNode;
import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.transfer.ChannelID;
import io.ray.streaming.runtime.transfer.DataReader;
import io.ray.streaming.runtime.transfer.DataWriter;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.RayRuntimeContext;
import io.ray.streaming.util.Config;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class StreamTask
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    protected int taskId;
    protected Processor processor;
    protected JobWorker worker;
    protected DataReader reader;
    private Map<ExecutionEdge, DataWriter> writers;
    private Thread thread;

    public StreamTask(int taskId, Processor processor, JobWorker worker) {
        this.taskId = taskId;
        this.processor = processor;
        this.worker = worker;
        this.prepareTask();
        this.thread = new Thread(Ray.wrapRunnable(this), this.getClass().getName() + "-" + System.currentTimeMillis());
        this.thread.setDaemon(true);
    }

    private void prepareTask() {
        HashMap<String, String> queueConf = new HashMap<String, String>();
        this.worker.getConfig().forEach((k, v) -> queueConf.put((String)k, String.valueOf(v)));
        String queueSize = this.worker.getConfig().getOrDefault("channel_size", Config.CHANNEL_SIZE_DEFAULT);
        queueConf.put("channel_size", queueSize);
        String channelType = Ray.getRuntimeContext().isSingleProcess() ? "memory_channel" : "native_channel";
        queueConf.put("channel_type", channelType);
        ExecutionGraph executionGraph = this.worker.getExecutionGraph();
        ExecutionNode executionNode = this.worker.getExecutionNode();
        this.writers = new HashMap<ExecutionEdge, DataWriter>();
        List<ExecutionEdge> outputEdges = executionNode.getOutputEdges();
        ArrayList<Collector> collectors = new ArrayList<Collector>();
        for (ExecutionEdge edge : outputEdges) {
            HashMap outputActors = new HashMap();
            Map<Integer, BaseActor> taskId2Worker = executionGraph.getTaskId2WorkerByNodeId(edge.getTargetNodeId());
            taskId2Worker.forEach((targetTaskId, targetActor) -> {
                String queueName = ChannelID.genIdStr(this.taskId, targetTaskId, executionGraph.getBuildTime());
                outputActors.put(queueName, targetActor);
            });
            if (outputActors.isEmpty()) continue;
            ArrayList<String> channelIDs = new ArrayList<String>();
            outputActors.forEach((k, v) -> channelIDs.add((String)k));
            DataWriter writer = new DataWriter(channelIDs, outputActors, queueConf);
            LOG.info("Create DataWriter succeed.");
            this.writers.put(edge, writer);
            Partition partition = edge.getPartition();
            collectors.add(new OutputCollector(writer, channelIDs, outputActors.values(), partition));
        }
        List<ExecutionEdge> inputEdges = executionNode.getInputsEdges();
        HashMap<String, BaseActor> inputActors = new HashMap<String, BaseActor>();
        for (ExecutionEdge edge : inputEdges) {
            Map<Integer, BaseActor> taskId2Worker = executionGraph.getTaskId2WorkerByNodeId(edge.getSrcNodeId());
            taskId2Worker.forEach((srcTaskId, srcActor) -> {
                String queueName = ChannelID.genIdStr(srcTaskId, this.taskId, executionGraph.getBuildTime());
                inputActors.put(queueName, (BaseActor)srcActor);
            });
        }
        if (!inputActors.isEmpty()) {
            ArrayList<String> channelIDs = new ArrayList<String>();
            inputActors.forEach((k, v) -> channelIDs.add((String)k));
            LOG.info("Register queue consumer, queues {}.", (Object)channelIDs);
            this.reader = new DataReader(channelIDs, inputActors, queueConf);
        }
        RayRuntimeContext runtimeContext = new RayRuntimeContext(this.worker.getExecutionTask(), this.worker.getConfig(), executionNode.getParallelism());
        this.processor.open(collectors, runtimeContext);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                this.cancelTask();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }));
    }

    protected abstract void init() throws Exception;

    protected abstract void cancelTask() throws Exception;

    public void start() {
        this.thread.start();
        LOG.info("started {}-{}", (Object)this.getClass().getSimpleName(), (Object)this.taskId);
    }
}

