/*
 * Decompiled with CFR 0.152.
 */
package org.ray.streaming.runtime.worker;

import java.io.Serializable;
import java.util.Map;
import org.ray.api.Ray;
import org.ray.api.annotation.RayRemote;
import org.ray.runtime.RayMultiWorkerNativeRuntime;
import org.ray.runtime.functionmanager.JavaFunctionDescriptor;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
import org.ray.streaming.runtime.core.graph.ExecutionTask;
import org.ray.streaming.runtime.core.processor.OneInputProcessor;
import org.ray.streaming.runtime.core.processor.ProcessBuilder;
import org.ray.streaming.runtime.core.processor.SourceProcessor;
import org.ray.streaming.runtime.core.processor.StreamProcessor;
import org.ray.streaming.runtime.transfer.TransferHandler;
import org.ray.streaming.runtime.util.EnvUtil;
import org.ray.streaming.runtime.worker.context.WorkerContext;
import org.ray.streaming.runtime.worker.tasks.OneInputStreamTask;
import org.ray.streaming.runtime.worker.tasks.SourceStreamTask;
import org.ray.streaming.runtime.worker.tasks.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RayRemote
public class JobWorker
implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobWorker.class);
    private int taskId;
    private Map<String, String> config;
    private WorkerContext workerContext;
    private ExecutionNode executionNode;
    private ExecutionTask executionTask;
    private ExecutionGraph executionGraph;
    private StreamProcessor streamProcessor;
    private ExecutionNode.NodeType nodeType;
    private StreamTask task;
    private TransferHandler transferHandler;

    public Boolean init(WorkerContext workerContext) {
        this.workerContext = workerContext;
        this.taskId = workerContext.getTaskId();
        this.config = workerContext.getConfig();
        this.executionGraph = this.workerContext.getExecutionGraph();
        this.executionTask = this.executionGraph.getExecutionTaskByTaskId(this.taskId);
        this.executionNode = this.executionGraph.getExecutionNodeByTaskId(this.taskId);
        this.nodeType = this.executionNode.getNodeType();
        this.streamProcessor = ProcessBuilder.buildProcessor(this.executionNode.getStreamOperator());
        LOGGER.debug("Initializing StreamWorker, taskId: {}, operator: {}.", (Object)this.taskId, (Object)this.streamProcessor);
        String channelType = this.config.getOrDefault("channel_type", "native_channel");
        if (channelType.equals("native_channel")) {
            this.transferHandler = new TransferHandler(JobWorker.getNativeCoreWorker(), new JavaFunctionDescriptor(JobWorker.class.getName(), "onWriterMessage", "([B)V"), new JavaFunctionDescriptor(JobWorker.class.getName(), "onWriterMessageSync", "([B)[B"), new JavaFunctionDescriptor(JobWorker.class.getName(), "onReaderMessage", "([B)V"), new JavaFunctionDescriptor(JobWorker.class.getName(), "onReaderMessageSync", "([B)[B"));
        }
        this.task = this.createStreamTask();
        this.task.start();
        return true;
    }

    private StreamTask createStreamTask() {
        if (this.streamProcessor instanceof OneInputProcessor) {
            return new OneInputStreamTask(this.taskId, this.streamProcessor, this);
        }
        if (this.streamProcessor instanceof SourceProcessor) {
            return new SourceStreamTask(this.taskId, this.streamProcessor, this);
        }
        throw new RuntimeException("Unsupported type: " + this.streamProcessor);
    }

    public int getTaskId() {
        return this.taskId;
    }

    public Map<String, String> getConfig() {
        return this.config;
    }

    public WorkerContext getWorkerContext() {
        return this.workerContext;
    }

    public ExecutionNode.NodeType getNodeType() {
        return this.nodeType;
    }

    public ExecutionNode getExecutionNode() {
        return this.executionNode;
    }

    public ExecutionTask getExecutionTask() {
        return this.executionTask;
    }

    public ExecutionGraph getExecutionGraph() {
        return this.executionGraph;
    }

    public StreamProcessor getStreamProcessor() {
        return this.streamProcessor;
    }

    public StreamTask getTask() {
        return this.task;
    }

    public void onReaderMessage(byte[] buffer) {
        this.transferHandler.onReaderMessage(buffer);
    }

    public byte[] onReaderMessageSync(byte[] buffer) {
        return this.transferHandler.onReaderMessageSync(buffer);
    }

    public void onWriterMessage(byte[] buffer) {
        this.transferHandler.onWriterMessage(buffer);
    }

    public byte[] onWriterMessageSync(byte[] buffer) {
        return this.transferHandler.onWriterMessageSync(buffer);
    }

    private static long getNativeCoreWorker() {
        long pointer = 0L;
        if (Ray.internal() instanceof RayMultiWorkerNativeRuntime) {
            pointer = ((RayMultiWorkerNativeRuntime)Ray.internal()).getCurrentRuntime().getNativeCoreWorkerPointer();
        }
        return pointer;
    }

    static {
        EnvUtil.loadNativeLibraries();
    }
}

