/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.core.collector;

import io.ray.api.BaseActor;
import io.ray.api.RayPyActor;
import io.ray.streaming.api.Language;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.message.Record;
import io.ray.streaming.runtime.serialization.CrossLangSerializer;
import io.ray.streaming.runtime.serialization.JavaSerializer;
import io.ray.streaming.runtime.serialization.Serializer;
import io.ray.streaming.runtime.transfer.ChannelID;
import io.ray.streaming.runtime.transfer.DataWriter;
import java.nio.ByteBuffer;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OutputCollector
implements Collector<Record> {
    private static final Logger LOGGER = LoggerFactory.getLogger(OutputCollector.class);
    private final DataWriter writer;
    private final ChannelID[] outputQueues;
    private final Collection<BaseActor> targetActors;
    private final Language[] targetLanguages;
    private final Partition partition;
    private final Serializer javaSerializer = new JavaSerializer();
    private final Serializer crossLangSerializer = new CrossLangSerializer();

    public OutputCollector(DataWriter writer, Collection<String> outputQueueIds, Collection<BaseActor> targetActors, Partition partition) {
        this.writer = writer;
        this.outputQueues = (ChannelID[])outputQueueIds.stream().map(ChannelID::from).toArray(ChannelID[]::new);
        this.targetActors = targetActors;
        this.targetLanguages = (Language[])targetActors.stream().map(actor -> actor instanceof RayPyActor ? Language.PYTHON : Language.JAVA).toArray(Language[]::new);
        this.partition = partition;
        LOGGER.debug("OutputCollector constructed, outputQueueIds:{}, partition:{}.", (Object)outputQueueIds, (Object)this.partition);
    }

    @Override
    public void collect(Record record) {
        int[] partitions = this.partition.partition(record, this.outputQueues.length);
        ByteBuffer javaBuffer = null;
        ByteBuffer crossLangBuffer = null;
        for (int partition : partitions) {
            byte[] bytes;
            if (this.targetLanguages[partition] == Language.JAVA) {
                if (javaBuffer == null) {
                    bytes = this.javaSerializer.serialize(record);
                    javaBuffer = ByteBuffer.allocate(1 + bytes.length);
                    javaBuffer.put((byte)1);
                    javaBuffer.put(bytes);
                    javaBuffer.flip();
                }
                this.writer.write(this.outputQueues[partition], javaBuffer.duplicate());
                continue;
            }
            if (crossLangBuffer == null) {
                bytes = this.crossLangSerializer.serialize(record);
                crossLangBuffer = ByteBuffer.allocate(1 + bytes.length);
                crossLangBuffer.put((byte)0);
                crossLangBuffer.put(bytes);
                crossLangBuffer.flip();
            }
            this.writer.write(this.outputQueues[partition], crossLangBuffer.duplicate());
        }
    }
}

