/*
 * Decompiled with CFR 0.152.
 */
package io.ray.serve.poll;

import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.PyActorHandle;
import io.ray.api.Ray;
import io.ray.api.exception.RayActorException;
import io.ray.api.exception.RayTaskException;
import io.ray.api.exception.RayTimeoutException;
import io.ray.api.function.PyActorMethod;
import io.ray.serve.api.Serve;
import io.ray.serve.controller.ServeController;
import io.ray.serve.generated.ActorNameList;
import io.ray.serve.poll.KeyListener;
import io.ray.serve.poll.KeyType;
import io.ray.serve.poll.LongPollNamespace;
import io.ray.serve.poll.LongPollRequest;
import io.ray.serve.poll.LongPollResult;
import io.ray.serve.poll.UpdatedObject;
import io.ray.serve.replica.ReplicaContext;
import io.ray.serve.util.CollectionUtil;
import io.ray.serve.util.ServeProtoUtil;
import io.ray.shaded.com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LongPollClientFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(LongPollClientFactory.class);
    private static BaseActorHandle hostActor;
    private static final Map<KeyType, KeyListener> KEY_LISTENERS;
    public static final Map<KeyType, Integer> SNAPSHOT_IDS;
    public static final Map<KeyType, Object> OBJECT_SNAPSHOTS;
    private static ScheduledExecutorService scheduledExecutorService;
    private static boolean inited;
    private static long longPollTimoutS;
    public static final Map<LongPollNamespace, Function<byte[], Object>> DESERIALIZERS;

    public static void register(BaseActorHandle hostActor, Map<KeyType, KeyListener> keyListeners) {
        LongPollClientFactory.init(hostActor);
        if (!inited) {
            return;
        }
        KEY_LISTENERS.putAll(keyListeners);
        for (KeyType keyType : keyListeners.keySet()) {
            SNAPSHOT_IDS.put(keyType, -1);
        }
        LOGGER.info("LongPollClient registered keys: {}.", (Object)keyListeners.keySet());
        try {
            LongPollClientFactory.pollNext();
        }
        catch (RayTimeoutException e) {
            LOGGER.info("Register poll timeout. keys:{}", (Object)keyListeners.keySet());
        }
    }

    public static synchronized void init(BaseActorHandle hostActor) {
        if (inited) {
            return;
        }
        long intervalS = 10L;
        try {
            ReplicaContext replicaContext = Serve.getReplicaContext();
            boolean enabled = Optional.ofNullable(replicaContext.getConfig()).map(config -> (String)config.get("ray.serve.long.poll.client.enabled")).map(Boolean::valueOf).orElse(true);
            if (!enabled) {
                LOGGER.info("LongPollClient is disabled.");
                return;
            }
            if (null == hostActor) {
                hostActor = (BaseActorHandle)Ray.getActor("SERVE_CONTROLLER_ACTOR", "serve").get();
            }
            intervalS = Optional.ofNullable(replicaContext.getConfig()).map(config -> (String)config.get("ray.serve.long.poll.client.interval_s")).map(Long::valueOf).orElse(10L);
            longPollTimoutS = Optional.ofNullable(replicaContext.getConfig()).map(config -> (String)config.get("ray.serve.long.poll.client.timeout_s")).map(Long::valueOf).orElse(10L);
        }
        catch (Exception e) {
            LOGGER.info("Serve.getReplicaContext()` may only be called from within a Ray Serve deployment.");
        }
        Preconditions.checkNotNull(hostActor);
        LongPollClientFactory.hostActor = hostActor;
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "ray-serve-long-poll-client-thread");
                thread.setDaemon(true);
                return thread;
            }
        });
        long finalIntervalS = intervalS;
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            try {
                LongPollClientFactory.pollNext();
            }
            catch (RayTimeoutException e) {
                LOGGER.info("long poll timeout in {} seconds, execute next poll after {} seconds.", (Object)longPollTimoutS, (Object)finalIntervalS);
            }
            catch (RayActorException e) {
                LOGGER.error("LongPollClient failed to connect to host. Shutting down.");
                LongPollClientFactory.stop();
            }
            catch (RayTaskException e) {
                LOGGER.error("LongPollHost errored", e);
            }
            catch (Throwable e) {
                LOGGER.error("LongPollClient failed to update object of key {}", (Object)SNAPSHOT_IDS, (Object)e);
            }
        }, 0L, intervalS, TimeUnit.SECONDS);
        inited = true;
        LOGGER.info("LongPollClient was initialized");
    }

    public static synchronized void pollNext() {
        LOGGER.info("LongPollClient polls next snapshotIds {}", (Object)SNAPSHOT_IDS);
        LongPollRequest longPollRequest = new LongPollRequest(SNAPSHOT_IDS);
        LongPollResult longPollResult = null;
        if (hostActor instanceof PyActorHandle) {
            ObjectRef<Object> currentRef = ((PyActorHandle)hostActor).task(PyActorMethod.of("listen_for_change_java"), longPollRequest.toProtobuf().toByteArray()).remote();
            Object data = Ray.get(currentRef, longPollTimoutS * 1000L);
            longPollResult = LongPollResult.parseFrom((byte[])data);
        } else {
            ObjectRef<byte[]> currentRef = ((ActorHandle)hostActor).task(ServeController::listenForChange, longPollRequest).remote();
            longPollResult = LongPollResult.parseFrom(currentRef.get(longPollTimoutS * 1000L));
        }
        LongPollClientFactory.processUpdate(longPollResult == null ? null : longPollResult.getUpdatedObjects());
    }

    public static void processUpdate(Map<KeyType, UpdatedObject> updates) {
        if (updates == null || updates.isEmpty()) {
            LOGGER.info("LongPollClient received nothing.");
            return;
        }
        LOGGER.info("LongPollClient received updates for keys: {}", (Object)updates.keySet());
        for (Map.Entry<KeyType, UpdatedObject> entry : updates.entrySet()) {
            KeyListener keyListener;
            KeyType keyType = entry.getKey();
            UpdatedObject updatedObject = entry.getValue();
            Object objectSnapshot = updatedObject.getObjectSnapshot();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("The updated object for key {} is {}", (Object)keyType, (Object)ReflectionToStringBuilder.toString(objectSnapshot));
            }
            if ((keyListener = KEY_LISTENERS.get(entry.getKey())) == null) {
                LOGGER.warn("LongPollClient has no listener for key: {}, maybe this key was garbage collected.", (Object)entry.getKey());
                continue;
            }
            KEY_LISTENERS.get(entry.getKey()).notifyChanged(objectSnapshot);
            OBJECT_SNAPSHOTS.put(entry.getKey(), objectSnapshot);
            SNAPSHOT_IDS.put(entry.getKey(), entry.getValue().getSnapshotId());
        }
    }

    public static void unregister(Set<KeyType> keys) {
        if (CollectionUtil.isEmpty(keys)) {
            return;
        }
        for (KeyType keyType : keys) {
            SNAPSHOT_IDS.remove(keyType);
            KEY_LISTENERS.remove(keyType);
            OBJECT_SNAPSHOTS.remove(keyType);
        }
        LOGGER.info("LongPollClient unregistered keys: {}.", (Object)keys);
    }

    public static synchronized void stop() {
        if (!inited) {
            return;
        }
        if (scheduledExecutorService != null) {
            scheduledExecutorService.shutdown();
            try {
                scheduledExecutorService.awaitTermination(longPollTimoutS, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOGGER.error("awaitTermination error, the exception is ", e);
            }
        }
        KEY_LISTENERS.clear();
        OBJECT_SNAPSHOTS.clear();
        SNAPSHOT_IDS.clear();
        inited = false;
        LOGGER.info("LongPollClient was stopped.");
    }

    public static boolean isInitialized() {
        return inited;
    }

    static {
        KEY_LISTENERS = new ConcurrentHashMap<KeyType, KeyListener>();
        SNAPSHOT_IDS = new ConcurrentHashMap<KeyType, Integer>();
        OBJECT_SNAPSHOTS = new ConcurrentHashMap<KeyType, Object>();
        inited = false;
        longPollTimoutS = 1L;
        DESERIALIZERS = new HashMap<LongPollNamespace, Function<byte[], Object>>();
        DESERIALIZERS.put(LongPollNamespace.ROUTE_TABLE, ServeProtoUtil::parseEndpointSet);
        DESERIALIZERS.put(LongPollNamespace.RUNNING_REPLICAS, bytes -> ServeProtoUtil.bytesToProto(bytes, ActorNameList::parseFrom));
    }
}

