/*
 * Decompiled with CFR 0.152.
 */
package io.ray.runtime.object;

import com.google.common.base.Preconditions;
import io.ray.api.id.ObjectId;
import io.ray.runtime.context.WorkerContext;
import io.ray.runtime.object.NativeRayObject;
import io.ray.runtime.object.ObjectStore;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalModeObjectStore
extends ObjectStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(LocalModeObjectStore.class);
    private static final int GET_CHECK_INTERVAL_MS = 100;
    private final Map<ObjectId, NativeRayObject> pool = new ConcurrentHashMap<ObjectId, NativeRayObject>();
    private final List<Consumer<ObjectId>> objectPutCallbacks = new ArrayList<Consumer<ObjectId>>();

    public LocalModeObjectStore(WorkerContext workerContext) {
        super(workerContext);
    }

    public void addObjectPutCallback(Consumer<ObjectId> callback) {
        this.objectPutCallbacks.add(callback);
    }

    public boolean isObjectReady(ObjectId id) {
        return this.pool.containsKey(id);
    }

    @Override
    public ObjectId putRaw(NativeRayObject obj) {
        ObjectId objectId = ObjectId.fromRandom();
        this.putRaw(obj, objectId);
        return objectId;
    }

    @Override
    public void putRaw(NativeRayObject obj, ObjectId objectId) {
        Preconditions.checkNotNull(obj);
        Preconditions.checkNotNull(objectId);
        this.pool.putIfAbsent(objectId, obj);
        for (Consumer<ObjectId> callback : this.objectPutCallbacks) {
            callback.accept(objectId);
        }
    }

    @Override
    public List<NativeRayObject> getRaw(List<ObjectId> objectIds, long timeoutMs) {
        this.waitInternal(objectIds, objectIds.size(), timeoutMs);
        return objectIds.stream().map(this.pool::get).collect(Collectors.toList());
    }

    @Override
    public List<Boolean> wait(List<ObjectId> objectIds, int numObjects, long timeoutMs) {
        this.waitInternal(objectIds, numObjects, timeoutMs);
        return objectIds.stream().map(this.pool::containsKey).collect(Collectors.toList());
    }

    private void waitInternal(List<ObjectId> objectIds, int numObjects, long timeoutMs) {
        int ready = 0;
        long remainingTime = timeoutMs;
        boolean firstCheck = true;
        while (ready < numObjects && (timeoutMs < 0L || remainingTime > 0L)) {
            if (!firstCheck) {
                long sleepTime = timeoutMs < 0L ? 100L : Math.min(remainingTime, 100L);
                try {
                    Thread.sleep(sleepTime);
                }
                catch (InterruptedException e) {
                    LOGGER.warn("Got InterruptedException while sleeping.");
                }
                remainingTime -= sleepTime;
            }
            ready = 0;
            for (ObjectId objectId : objectIds) {
                if (!this.pool.containsKey(objectId)) continue;
                ++ready;
            }
            firstCheck = false;
        }
    }

    @Override
    public void delete(List<ObjectId> objectIds, boolean localOnly, boolean deleteCreatingTasks) {
        for (ObjectId objectId : objectIds) {
            this.pool.remove(objectId);
        }
    }
}

