/*
 * Decompiled with CFR 0.152.
 */
package org.ray.streaming.runtime.worker.tasks;

import org.ray.runtime.util.Serializer;
import org.ray.streaming.runtime.core.processor.Processor;
import org.ray.streaming.runtime.transfer.DataMessage;
import org.ray.streaming.runtime.worker.JobWorker;
import org.ray.streaming.runtime.worker.tasks.StreamTask;

public abstract class InputStreamTask
extends StreamTask {
    private volatile boolean running = true;
    private volatile boolean stopped = false;
    private long readTimeoutMillis;

    public InputStreamTask(int taskId, Processor processor, JobWorker streamWorker) {
        super(taskId, processor, streamWorker);
        this.readTimeoutMillis = Long.parseLong(streamWorker.getConfig().getOrDefault("read_timeout_ms", "10"));
    }

    @Override
    protected void init() {
    }

    @Override
    public void run() {
        while (this.running) {
            DataMessage item = this.reader.read(this.readTimeoutMillis);
            if (item == null) continue;
            byte[] bytes = new byte[item.body().remaining()];
            item.body().get(bytes);
            Object obj = Serializer.decode(bytes);
            this.processor.process(obj);
        }
        this.stopped = true;
    }

    @Override
    protected void cancelTask() throws Exception {
        this.running = false;
        while (!this.stopped) {
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("InputStreamTask{");
        sb.append("taskId=").append(this.taskId);
        sb.append(", processor=").append(this.processor);
        sb.append('}');
        return sb.toString();
    }
}

