/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.worker.tasks;

import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.serialization.CrossLangSerializer;
import io.ray.streaming.runtime.serialization.JavaSerializer;
import io.ray.streaming.runtime.serialization.Serializer;
import io.ray.streaming.runtime.transfer.DataMessage;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.tasks.StreamTask;

public abstract class InputStreamTask
extends StreamTask {
    private volatile boolean running = true;
    private volatile boolean stopped = false;
    private long readTimeoutMillis;
    private final Serializer javaSerializer;
    private final Serializer crossLangSerializer;

    public InputStreamTask(int taskId, Processor processor, JobWorker streamWorker) {
        super(taskId, processor, streamWorker);
        this.readTimeoutMillis = Long.parseLong(streamWorker.getConfig().getOrDefault("read_timeout_ms", "10"));
        this.javaSerializer = new JavaSerializer();
        this.crossLangSerializer = new CrossLangSerializer();
    }

    @Override
    protected void init() {
    }

    @Override
    public void run() {
        while (this.running) {
            DataMessage item = this.reader.read(this.readTimeoutMillis);
            if (item == null) continue;
            byte[] bytes = new byte[item.body().remaining() - 1];
            byte typeId = item.body().get();
            item.body().get(bytes);
            Object obj = typeId == 1 ? this.javaSerializer.deserialize(bytes) : this.crossLangSerializer.deserialize(bytes);
            this.processor.process(obj);
        }
        this.stopped = true;
    }

    @Override
    protected void cancelTask() throws Exception {
        this.running = false;
        while (!this.stopped) {
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("InputStreamTask{");
        sb.append("taskId=").append(this.taskId);
        sb.append(", processor=").append(this.processor);
        sb.append('}');
        return sb.toString();
    }
}

