/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.worker;

import io.ray.api.Ray;
import io.ray.streaming.runtime.core.graph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.ExecutionNode;
import io.ray.streaming.runtime.core.graph.ExecutionTask;
import io.ray.streaming.runtime.core.processor.OneInputProcessor;
import io.ray.streaming.runtime.core.processor.ProcessBuilder;
import io.ray.streaming.runtime.core.processor.SourceProcessor;
import io.ray.streaming.runtime.core.processor.StreamProcessor;
import io.ray.streaming.runtime.transfer.TransferHandler;
import io.ray.streaming.runtime.util.EnvUtil;
import io.ray.streaming.runtime.worker.context.WorkerContext;
import io.ray.streaming.runtime.worker.tasks.OneInputStreamTask;
import io.ray.streaming.runtime.worker.tasks.SourceStreamTask;
import io.ray.streaming.runtime.worker.tasks.StreamTask;
import java.io.Serializable;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobWorker
implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobWorker.class);
    private static final byte[] NOT_READY_FLAG = new byte[4];
    private int taskId;
    private Map<String, String> config;
    private WorkerContext workerContext;
    private ExecutionNode executionNode;
    private ExecutionTask executionTask;
    private ExecutionGraph executionGraph;
    private StreamProcessor streamProcessor;
    private ExecutionNode.NodeType nodeType;
    private StreamTask task;
    private TransferHandler transferHandler;

    public Boolean init(WorkerContext workerContext) {
        this.workerContext = workerContext;
        this.taskId = workerContext.getTaskId();
        this.config = workerContext.getConfig();
        this.executionGraph = this.workerContext.getExecutionGraph();
        this.executionTask = this.executionGraph.getExecutionTaskByTaskId(this.taskId);
        this.executionNode = this.executionGraph.getExecutionNodeByTaskId(this.taskId);
        this.nodeType = this.executionNode.getNodeType();
        this.streamProcessor = ProcessBuilder.buildProcessor(this.executionNode.getStreamOperator());
        LOGGER.info("Initializing StreamWorker, pid {}, taskId: {}, operator: {}.", EnvUtil.getJvmPid(), this.taskId, this.streamProcessor);
        if (!Ray.getRuntimeContext().isSingleProcess()) {
            this.transferHandler = new TransferHandler();
        }
        this.task = this.createStreamTask();
        this.task.start();
        return true;
    }

    private StreamTask createStreamTask() {
        if (this.streamProcessor instanceof OneInputProcessor) {
            return new OneInputStreamTask(this.taskId, this.streamProcessor, this);
        }
        if (this.streamProcessor instanceof SourceProcessor) {
            return new SourceStreamTask(this.taskId, this.streamProcessor, this);
        }
        throw new RuntimeException("Unsupported type: " + this.streamProcessor);
    }

    public int getTaskId() {
        return this.taskId;
    }

    public Map<String, String> getConfig() {
        return this.config;
    }

    public WorkerContext getWorkerContext() {
        return this.workerContext;
    }

    public ExecutionNode.NodeType getNodeType() {
        return this.nodeType;
    }

    public ExecutionNode getExecutionNode() {
        return this.executionNode;
    }

    public ExecutionTask getExecutionTask() {
        return this.executionTask;
    }

    public ExecutionGraph getExecutionGraph() {
        return this.executionGraph;
    }

    public StreamProcessor getStreamProcessor() {
        return this.streamProcessor;
    }

    public StreamTask getTask() {
        return this.task;
    }

    public void onReaderMessage(byte[] buffer) {
        this.transferHandler.onReaderMessage(buffer);
    }

    public byte[] onReaderMessageSync(byte[] buffer) {
        if (this.transferHandler == null) {
            return NOT_READY_FLAG;
        }
        return this.transferHandler.onReaderMessageSync(buffer);
    }

    public void onWriterMessage(byte[] buffer) {
        this.transferHandler.onWriterMessage(buffer);
    }

    public byte[] onWriterMessageSync(byte[] buffer) {
        if (this.transferHandler == null) {
            return NOT_READY_FLAG;
        }
        return this.transferHandler.onWriterMessageSync(buffer);
    }

    static {
        EnvUtil.loadNativeLibraries();
    }
}

