/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.alink.operator.stream.sink;

import com.alibaba.alink.common.annotation.NameCn;
import com.alibaba.alink.common.io.annotations.AnnotationUtils;
import com.alibaba.alink.common.io.annotations.IOType;
import com.alibaba.alink.common.io.annotations.IoOpAnnotation;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.sink.BaseSinkStreamOp;
import com.alibaba.alink.params.io.SocketSinkParams;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.nio.charset.StandardCharsets;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
import org.apache.flink.types.Row;
import scala.Serializable;

@IoOpAnnotation(name="socket_sink", ioType=IOType.SinkStream)
@NameCn(value="")
public class SocketSinkStreamOp
extends BaseSinkStreamOp<SocketSinkStreamOp>
implements SocketSinkParams<SocketSinkStreamOp> {
    public SocketSinkStreamOp() {
        this(new Params());
    }

    public SocketSinkStreamOp(Params params) {
        super(AnnotationUtils.annotatedName(SocketSinkStreamOp.class), params);
    }

    public SocketSinkStreamOp sinkFrom(StreamOperator<?> in) {
        DataStreamSink returnStream = in.getDataStream().addSink((SinkFunction)new SocketClientSink(this.getHost(), this.getPort().intValue(), (SerializationSchema)new PyRowSerializationSchema(), 0, true));
        returnStream.setParallelism(1);
        return this;
    }

    public static class PyRowSerializationSchema
    implements SerializationSchema<Row> {
        public Gson gson;
        public LegacyRow legacyRow = new LegacyRow();

        public byte[] serialize(Row element) {
            if (null == this.gson) {
                this.gson = new GsonBuilder().serializeSpecialFloatingPointValues().create();
            }
            this.legacyRow.fields = new Object[element.getArity()];
            for (int i = 0; i < element.getArity(); ++i) {
                this.legacyRow.fields[i] = element.getField(i);
            }
            String str = this.gson.toJson((Object)this.legacyRow) + "\r\n";
            return str.getBytes(StandardCharsets.UTF_8);
        }
    }

    public static class LegacyRow
    implements Serializable {
        public Object[] fields;
    }
}

