/*
 * Decompiled with CFR 0.152.
 */
package org.ray.streaming.runtime.transfer;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.ray.api.id.ActorId;
import org.ray.api.id.BaseId;
import org.ray.streaming.runtime.transfer.ChannelID;
import org.ray.streaming.runtime.transfer.ChannelUtils;
import org.ray.streaming.runtime.util.Platform;
import org.ray.streaming.util.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataWriter.class);
    private long nativeWriterPtr;
    private ByteBuffer buffer = ByteBuffer.allocateDirect(0);
    private long bufferAddress;

    public DataWriter(List<String> outputChannels, List<ActorId> toActors, Map<String, String> conf) {
        this.ensureBuffer(0);
        Preconditions.checkArgument(!outputChannels.isEmpty());
        Preconditions.checkArgument(outputChannels.size() == toActors.size());
        byte[][] outputChannelsBytes = (byte[][])outputChannels.stream().map(ChannelID::idStrToBytes).toArray(x$0 -> new byte[x$0][]);
        byte[][] toActorsBytes = (byte[][])toActors.stream().map(BaseId::getBytes).toArray(x$0 -> new byte[x$0][]);
        long channelSize = Long.parseLong(conf.getOrDefault("channel_size", Config.CHANNEL_SIZE_DEFAULT));
        long[] msgIds = new long[outputChannels.size()];
        for (int i = 0; i < outputChannels.size(); ++i) {
            msgIds[i] = 0L;
        }
        String channelType = conf.getOrDefault("channel_type", "native_channel");
        boolean isMock = false;
        if ("memory_channel".equals(channelType)) {
            isMock = true;
        }
        this.nativeWriterPtr = DataWriter.createWriterNative(outputChannelsBytes, toActorsBytes, msgIds, channelSize, ChannelUtils.toNativeConf(conf), isMock);
        LOGGER.info("create DataWriter succeed");
    }

    public void write(ChannelID id, ByteBuffer item) {
        int size = item.remaining();
        this.ensureBuffer(size);
        this.buffer.clear();
        this.buffer.put(item);
        this.writeMessageNative(this.nativeWriterPtr, id.getNativeIdPtr(), this.bufferAddress, size);
    }

    public void write(Set<ChannelID> ids, ByteBuffer item) {
        int size = item.remaining();
        this.ensureBuffer(size);
        for (ChannelID id : ids) {
            this.buffer.clear();
            this.buffer.put(item.duplicate());
            this.writeMessageNative(this.nativeWriterPtr, id.getNativeIdPtr(), this.bufferAddress, size);
        }
    }

    private void ensureBuffer(int size) {
        if (this.buffer.capacity() < size) {
            this.buffer = ByteBuffer.allocateDirect(size);
            this.buffer.order(ByteOrder.nativeOrder());
            this.bufferAddress = Platform.getAddress(this.buffer);
        }
    }

    public void stop() {
        this.stopWriterNative(this.nativeWriterPtr);
    }

    public void close() {
        if (this.nativeWriterPtr == 0L) {
            return;
        }
        LOGGER.info("closing data writer.");
        this.closeWriterNative(this.nativeWriterPtr);
        this.nativeWriterPtr = 0L;
        LOGGER.info("closing data writer done.");
    }

    private static native long createWriterNative(byte[][] var0, byte[][] var1, long[] var2, long var3, byte[] var5, boolean var6);

    private native long writeMessageNative(long var1, long var3, long var5, int var7);

    private native void stopWriterNative(long var1);

    private native void closeWriterNative(long var1);
}

