/*
 * Decompiled with CFR 0.152.
 */
package org.ray.streaming.runtime.python;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.msgpack.core.Preconditions;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.python.PythonFunction;
import org.ray.streaming.python.PythonPartition;
import org.ray.streaming.python.stream.PythonStreamSource;
import org.ray.streaming.runtime.python.MsgPackSerializer;
import org.ray.streaming.runtime.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PythonGateway {
    private static final Logger LOG = LoggerFactory.getLogger(PythonGateway.class);
    private static final String REFERENCE_ID_PREFIX = "__gateway_reference_id__";
    private MsgPackSerializer serializer = new MsgPackSerializer();
    private Map<String, Object> referenceMap = new HashMap<String, Object>();
    private StreamingContext streamingContext;

    public PythonGateway() {
        LOG.info("PythonGateway created");
    }

    public byte[] createStreamingContext() {
        this.streamingContext = StreamingContext.buildContext();
        LOG.info("StreamingContext created");
        this.referenceMap.put(this.getReferenceId(this.streamingContext), this.streamingContext);
        return this.serializer.serialize(this.getReferenceId(this.streamingContext));
    }

    public StreamingContext getStreamingContext() {
        return this.streamingContext;
    }

    public byte[] withConfig(byte[] confBytes) {
        Preconditions.checkNotNull(this.streamingContext);
        try {
            Map config = (Map)this.serializer.deserialize(confBytes);
            LOG.info("Set config {}", (Object)config);
            this.streamingContext.withConfig(config);
            return new byte[1];
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public byte[] createPythonStreamSource(byte[] pySourceFunc) {
        Preconditions.checkNotNull(this.streamingContext);
        try {
            PythonStreamSource pythonStreamSource = PythonStreamSource.from(this.streamingContext, PythonFunction.fromFunction(pySourceFunc));
            this.referenceMap.put(this.getReferenceId(pythonStreamSource), pythonStreamSource);
            return this.serializer.serialize(this.getReferenceId(pythonStreamSource));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public byte[] execute(byte[] jobNameBytes) {
        LOG.info("Starting executing");
        this.streamingContext.execute((String)this.serializer.deserialize(jobNameBytes));
        return new byte[1];
    }

    public byte[] createPyFunc(byte[] pyFunc) {
        PythonFunction function = PythonFunction.fromFunction(pyFunc);
        this.referenceMap.put(this.getReferenceId(function), function);
        return this.serializer.serialize(this.getReferenceId(function));
    }

    public byte[] createPyPartition(byte[] pyPartition) {
        PythonPartition partition = new PythonPartition(pyPartition);
        this.referenceMap.put(this.getReferenceId(partition), partition);
        return this.serializer.serialize(this.getReferenceId(partition));
    }

    public byte[] callFunction(byte[] paramsBytes) {
        try {
            List<Object> params = (List<Object>)this.serializer.deserialize(paramsBytes);
            params = this.processReferenceParameters(params);
            LOG.info("callFunction params {}", (Object)params);
            String className = (String)params.get(0);
            String funcName = (String)params.get(1);
            Class<?> clz = Class.forName(className, true, this.getClass().getClassLoader());
            Method method = ReflectionUtils.findMethod(clz, funcName);
            Object result = method.invoke(null, params.subList(2, params.size()).toArray());
            this.referenceMap.put(this.getReferenceId(result), result);
            return this.serializer.serialize(this.getReferenceId(result));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public byte[] callMethod(byte[] paramsBytes) {
        try {
            List<Object> params = (List<Object>)this.serializer.deserialize(paramsBytes);
            params = this.processReferenceParameters(params);
            LOG.info("callMethod params {}", (Object)params);
            Object obj = params.get(0);
            String methodName = (String)params.get(1);
            Method method = ReflectionUtils.findMethod(obj.getClass(), methodName);
            Object result = method.invoke(obj, params.subList(2, params.size()).toArray());
            this.referenceMap.put(this.getReferenceId(result), result);
            return this.serializer.serialize(this.getReferenceId(result));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private List<Object> processReferenceParameters(List<Object> params) {
        return params.stream().map(this::processReferenceParameter).collect(Collectors.toList());
    }

    private Object processReferenceParameter(Object o) {
        Object value;
        if (o instanceof String && (value = this.referenceMap.get(o)) != null) {
            return value;
        }
        return o;
    }

    private String getReferenceId(Object o) {
        return REFERENCE_ID_PREFIX + System.identityHashCode(o);
    }
}

