/*
 * Decompiled with CFR 0.152.
 */
package org.ray.streaming.runtime.python;

import com.google.protobuf.ByteString;
import java.util.Arrays;
import org.ray.runtime.actor.NativeRayActor;
import org.ray.streaming.api.function.Function;
import org.ray.streaming.api.partition.Partition;
import org.ray.streaming.python.PythonFunction;
import org.ray.streaming.python.PythonPartition;
import org.ray.streaming.runtime.core.graph.ExecutionEdge;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
import org.ray.streaming.runtime.core.graph.ExecutionTask;
import org.ray.streaming.runtime.generated.RemoteCall;
import org.ray.streaming.runtime.generated.Streaming;
import org.ray.streaming.runtime.python.MsgPackSerializer;

public class GraphPbBuilder {
    private MsgPackSerializer serializer = new MsgPackSerializer();

    public RemoteCall.ExecutionGraph buildExecutionGraphPb(ExecutionGraph graph) {
        RemoteCall.ExecutionGraph.Builder builder = RemoteCall.ExecutionGraph.newBuilder();
        builder.setBuildTime(graph.getBuildTime());
        for (ExecutionNode node : graph.getExecutionNodeList()) {
            RemoteCall.ExecutionGraph.ExecutionNode.Builder nodeBuilder = RemoteCall.ExecutionGraph.ExecutionNode.newBuilder();
            nodeBuilder.setNodeId(node.getNodeId());
            nodeBuilder.setParallelism(node.getParallelism());
            nodeBuilder.setNodeType(Streaming.NodeType.valueOf(node.getNodeType().name()));
            nodeBuilder.setLanguage(Streaming.Language.valueOf(node.getLanguage().name()));
            byte[] functionBytes = this.serializeFunction(node.getStreamOperator().getFunction());
            nodeBuilder.setFunction(ByteString.copyFrom(functionBytes));
            for (ExecutionTask task : node.getExecutionTasks()) {
                RemoteCall.ExecutionGraph.ExecutionTask.Builder taskBuilder = RemoteCall.ExecutionGraph.ExecutionTask.newBuilder();
                byte[] serializedActorHandle = ((NativeRayActor)task.getWorker()).toBytes();
                taskBuilder.setTaskId(task.getTaskId()).setTaskIndex(task.getTaskIndex()).setWorkerActor(ByteString.copyFrom(serializedActorHandle));
                nodeBuilder.addExecutionTasks(taskBuilder.build());
            }
            for (ExecutionEdge edge : node.getInputsEdges()) {
                nodeBuilder.addInputEdges(this.buildEdgePb(edge));
            }
            for (ExecutionEdge edge : node.getOutputEdges()) {
                nodeBuilder.addOutputEdges(this.buildEdgePb(edge));
            }
            builder.addExecutionNodes(nodeBuilder.build());
        }
        return builder.build();
    }

    private RemoteCall.ExecutionGraph.ExecutionEdge buildEdgePb(ExecutionEdge edge) {
        RemoteCall.ExecutionGraph.ExecutionEdge.Builder edgeBuilder = RemoteCall.ExecutionGraph.ExecutionEdge.newBuilder();
        edgeBuilder.setSrcNodeId(edge.getSrcNodeId());
        edgeBuilder.setTargetNodeId(edge.getTargetNodeId());
        edgeBuilder.setPartition(ByteString.copyFrom(this.serializePartition(edge.getPartition())));
        return edgeBuilder.build();
    }

    private byte[] serializeFunction(Function function) {
        if (function instanceof PythonFunction) {
            PythonFunction pyFunc = (PythonFunction)function;
            return this.serializer.serialize(Arrays.asList(pyFunc.getFunction(), pyFunc.getModuleName(), pyFunc.getClassName(), pyFunc.getFunctionName(), pyFunc.getFunctionInterface()));
        }
        return new byte[0];
    }

    private byte[] serializePartition(Partition partition) {
        if (partition instanceof PythonPartition) {
            PythonPartition pythonPartition = (PythonPartition)partition;
            return this.serializer.serialize(Arrays.asList(pythonPartition.getPartition(), pythonPartition.getModuleName(), pythonPartition.getClassName(), pythonPartition.getFunctionName()));
        }
        return new byte[0];
    }
}

