/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.python;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Primitives;
import io.ray.streaming.api.context.StreamingContext;
import io.ray.streaming.api.stream.DataStream;
import io.ray.streaming.api.stream.Stream;
import io.ray.streaming.python.PythonFunction;
import io.ray.streaming.python.PythonPartition;
import io.ray.streaming.python.stream.PythonDataStream;
import io.ray.streaming.python.stream.PythonStreamSource;
import io.ray.streaming.runtime.serialization.MsgPackSerializer;
import io.ray.streaming.runtime.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PythonGateway {
    private static final Logger LOG = LoggerFactory.getLogger(PythonGateway.class);
    private static final String REFERENCE_ID_PREFIX = "__gateway_reference_id__";
    private static MsgPackSerializer serializer = new MsgPackSerializer();
    private Map<String, Object> referenceMap = new HashMap<String, Object>();
    private StreamingContext streamingContext;

    public PythonGateway() {
        LOG.info("PythonGateway created");
    }

    public byte[] createStreamingContext() {
        this.streamingContext = StreamingContext.buildContext();
        LOG.info("StreamingContext created");
        this.referenceMap.put(this.getReferenceId(this.streamingContext), this.streamingContext);
        return serializer.serialize(this.getReferenceId(this.streamingContext));
    }

    public StreamingContext getStreamingContext() {
        return this.streamingContext;
    }

    public byte[] withConfig(byte[] confBytes) {
        Preconditions.checkNotNull(this.streamingContext);
        try {
            Map config = (Map)serializer.deserialize(confBytes);
            LOG.info("Set config {}", (Object)config);
            this.streamingContext.withConfig(config);
            return new byte[1];
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public byte[] createPythonStreamSource(byte[] pySourceFunc) {
        Preconditions.checkNotNull(this.streamingContext);
        try {
            PythonStreamSource pythonStreamSource = PythonStreamSource.from(this.streamingContext, new PythonFunction(pySourceFunc));
            this.referenceMap.put(this.getReferenceId(pythonStreamSource), pythonStreamSource);
            return serializer.serialize(this.getReferenceId(pythonStreamSource));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public byte[] execute(byte[] jobNameBytes) {
        LOG.info("Starting executing");
        this.streamingContext.execute((String)serializer.deserialize(jobNameBytes));
        return new byte[1];
    }

    public byte[] createPyFunc(byte[] pyFunc) {
        PythonFunction function = new PythonFunction(pyFunc);
        this.referenceMap.put(this.getReferenceId(function), function);
        return serializer.serialize(this.getReferenceId(function));
    }

    public byte[] createPyPartition(byte[] pyPartition) {
        PythonPartition partition = new PythonPartition(pyPartition);
        this.referenceMap.put(this.getReferenceId(partition), partition);
        return serializer.serialize(this.getReferenceId(partition));
    }

    public byte[] union(byte[] paramsBytes) {
        Stream unionStream;
        List<Object> streams = (List<Object>)serializer.deserialize(paramsBytes);
        streams = this.processParameters(streams);
        LOG.info("Call union with streams {}", (Object)streams);
        Preconditions.checkArgument(streams.size() >= 2, "Union needs at least two streams");
        Stream stream1 = (Stream)streams.get(0);
        List<Object> otherStreams = streams.subList(1, streams.size());
        if (stream1 instanceof DataStream) {
            DataStream dataStream = (DataStream)stream1;
            unionStream = dataStream.union(otherStreams);
        } else {
            Preconditions.checkArgument(stream1 instanceof PythonDataStream);
            PythonDataStream pythonDataStream = (PythonDataStream)stream1;
            unionStream = pythonDataStream.union(otherStreams);
        }
        return this.serialize(unionStream);
    }

    public byte[] callFunction(byte[] paramsBytes) {
        try {
            List<Object> params = (List<Object>)serializer.deserialize(paramsBytes);
            params = this.processParameters(params);
            LOG.info("callFunction params {}", (Object)params);
            String className = (String)params.get(0);
            String funcName = (String)params.get(1);
            Class<?> clz = Class.forName(className, true, this.getClass().getClassLoader());
            Class[] paramsTypes = (Class[])params.subList(2, params.size()).stream().map(Object::getClass).toArray(Class[]::new);
            Method method = PythonGateway.findMethod(clz, funcName, paramsTypes);
            Object result = method.invoke(null, params.subList(2, params.size()).toArray());
            return this.serialize(result);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public byte[] callMethod(byte[] paramsBytes) {
        try {
            List<Object> params = (List<Object>)serializer.deserialize(paramsBytes);
            params = this.processParameters(params);
            LOG.info("callMethod params {}", (Object)params);
            Object obj = params.get(0);
            String methodName = (String)params.get(1);
            Class<?> clz = obj.getClass();
            Class[] paramsTypes = (Class[])params.subList(2, params.size()).stream().map(Object::getClass).toArray(Class[]::new);
            Method method = PythonGateway.findMethod(clz, methodName, paramsTypes);
            Object result = method.invoke(obj, params.subList(2, params.size()).toArray());
            return this.serialize(result);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static Method findMethod(Class<?> cls, String methodName, Class[] paramsTypes) {
        List<Method> methods = ReflectionUtils.findMethods(cls, methodName);
        if (methods.size() == 1) {
            return methods.get(0);
        }
        Class[] unwrappedTypes = (Class[])Arrays.stream(paramsTypes).map(Primitives::unwrap).toArray(Class[]::new);
        Optional<Method> any = methods.stream().filter(m3 -> {
            boolean exactMatch;
            boolean bl = exactMatch = Arrays.equals(m3.getParameterTypes(), paramsTypes) || Arrays.equals(m3.getParameterTypes(), unwrappedTypes);
            if (exactMatch) {
                return true;
            }
            if (paramsTypes.length == m3.getParameterTypes().length) {
                for (int i = 0; i < m3.getParameterTypes().length; ++i) {
                    Class<?> parameterType = m3.getParameterTypes()[i];
                    if (parameterType.isAssignableFrom(paramsTypes[i])) continue;
                    return false;
                }
                return true;
            }
            return false;
        }).findAny();
        Preconditions.checkArgument(any.isPresent(), String.format("Method %s with type %s doesn't exist on class %s", methodName, Arrays.toString(paramsTypes), cls));
        return any.get();
    }

    private byte[] serialize(Object value) {
        if (PythonGateway.returnReference(value)) {
            this.referenceMap.put(this.getReferenceId(value), value);
            return serializer.serialize(this.getReferenceId(value));
        }
        return serializer.serialize(value);
    }

    private static boolean returnReference(Object value) {
        if (PythonGateway.isBasic(value)) {
            return false;
        }
        try {
            serializer.serialize(value);
            return false;
        }
        catch (Exception e) {
            return true;
        }
    }

    private static boolean isBasic(Object value) {
        return value == null || value instanceof Boolean || value instanceof Number || value instanceof String || value instanceof byte[];
    }

    public byte[] newInstance(byte[] classNameBytes) {
        String className = (String)serializer.deserialize(classNameBytes);
        try {
            Class<?> clz = Class.forName(className, true, this.getClass().getClassLoader());
            Object instance = clz.newInstance();
            this.referenceMap.put(this.getReferenceId(instance), instance);
            return serializer.serialize(this.getReferenceId(instance));
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            throw new IllegalArgumentException(String.format("Create instance for class %s failed", className), e);
        }
    }

    private List<Object> processParameters(List<Object> params) {
        return params.stream().map(this::processParameter).collect(Collectors.toList());
    }

    private Object processParameter(Object o) {
        Object value;
        if (o instanceof String && (value = this.referenceMap.get(o)) != null) {
            return value;
        }
        if (o instanceof Byte || o instanceof Short) {
            return ((Number)o).intValue();
        }
        return o;
    }

    private String getReferenceId(Object o) {
        return REFERENCE_ID_PREFIX + System.identityHashCode(o);
    }
}

