/*
 * Decompiled with CFR 0.152.
 */
package org.ray.streaming.runtime.master.resourcemanager;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.ray.api.Ray;
import org.ray.api.runtimecontext.NodeInfo;
import org.ray.streaming.runtime.config.StreamingMasterConfig;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.ray.streaming.runtime.config.types.SlotAssignStrategyType;
import org.ray.streaming.runtime.core.resource.Container;
import org.ray.streaming.runtime.core.resource.Resources;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.ray.streaming.runtime.master.resourcemanager.ResourceManager;
import org.ray.streaming.runtime.master.scheduler.strategy.SlotAssignStrategy;
import org.ray.streaming.runtime.master.scheduler.strategy.SlotAssignStrategyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceManagerImpl
implements ResourceManager {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerImpl.class);
    private static final String CONTAINER_ENGAGED_KEY = "CONTAINER_ENGAGED_KEY";
    private JobRuntimeContext runtimeContext;
    private ResourceConfig resourceConfig;
    private SlotAssignStrategy slotAssignStrategy;
    private final Resources resources;
    private final ScheduledExecutorService scheduledExecutorService;

    public ResourceManagerImpl(JobRuntimeContext runtimeContext) {
        this.runtimeContext = runtimeContext;
        StreamingMasterConfig masterConfig = runtimeContext.getConf().masterConfig;
        this.resourceConfig = masterConfig.resourceConfig;
        this.resources = new Resources(this.resourceConfig);
        LOG.info("ResourceManagerImpl begin init, conf is {}, resources are {}.", (Object)this.resourceConfig, (Object)this.resources);
        SlotAssignStrategyType slotAssignStrategyType = SlotAssignStrategyType.PIPELINE_FIRST_STRATEGY;
        this.slotAssignStrategy = SlotAssignStrategyFactory.getStrategy(slotAssignStrategyType);
        this.slotAssignStrategy.setResources(this.resources);
        LOG.info("Slot assign strategy: {}.", (Object)this.slotAssignStrategy.getName());
        this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
        long intervalSecond = this.resourceConfig.resourceCheckIntervalSecond();
        this.scheduledExecutorService.scheduleAtFixedRate(Ray.wrapRunnable(this::checkAndUpdateResources), 0L, intervalSecond, TimeUnit.SECONDS);
        LOG.info("ResourceManagerImpl init success.");
    }

    @Override
    public Map<String, Double> allocateResource(Container container, Map<String, Double> requireResource) {
        LOG.info("Start to allocate resource for actor with container: {}.", (Object)container);
        HashMap<String, Double> resources = new HashMap<String, Double>();
        Map<String, Double> containResource = container.getAvailableResource();
        for (Map.Entry<String, Double> entry : containResource.entrySet()) {
            if (!requireResource.containsKey(entry.getKey())) continue;
            double availableResource = entry.getValue() - requireResource.get(entry.getKey());
            entry.setValue(availableResource);
            resources.put(entry.getKey(), requireResource.get(entry.getKey()));
        }
        LOG.info("Allocate resource: {} to container {}.", (Object)requireResource, (Object)container);
        return resources;
    }

    @Override
    public void deallocateResource(Container container, Map<String, Double> releaseResource) {
        LOG.info("Deallocating resource for container {}.", (Object)container);
        Map<String, Double> containResource = container.getAvailableResource();
        for (Map.Entry<String, Double> entry : containResource.entrySet()) {
            if (!releaseResource.containsKey(entry.getKey())) continue;
            double availableResource = entry.getValue() + releaseResource.get(entry.getKey());
            LOG.info("Release source {}:{}", (Object)entry.getKey(), (Object)releaseResource.get(entry.getKey()));
            entry.setValue(availableResource);
        }
        LOG.info("Deallocated resource for container {} success.", (Object)container);
    }

    @Override
    public List<Container> getRegisteredContainers() {
        return new ArrayList<Container>(this.resources.getRegisterContainers());
    }

    @Override
    public SlotAssignStrategy getSlotAssignStrategy() {
        return this.slotAssignStrategy;
    }

    @Override
    public Resources getResources() {
        return this.resources;
    }

    private void checkAndUpdateResources() {
        List<NodeInfo> latestNodeInfos = Ray.getRuntimeContext().getAllNodeInfo();
        List addNodes = latestNodeInfos.stream().filter(nodeInfo -> {
            for (Container container : this.resources.getRegisterContainers()) {
                if (!container.getNodeId().equals(nodeInfo.nodeId)) continue;
                return false;
            }
            return true;
        }).collect(Collectors.toList());
        List deleteContainers = this.resources.getRegisterContainers().stream().filter(container -> {
            for (NodeInfo nodeInfo : latestNodeInfos) {
                if (!nodeInfo.nodeId.equals(container.getNodeId())) continue;
                return false;
            }
            return true;
        }).collect(Collectors.toList());
        LOG.info("Latest node infos: {}, current containers: {}, add nodes: {}, delete nodes: {}.", latestNodeInfos, this.resources.getRegisterContainers(), addNodes, deleteContainers);
        if (!addNodes.isEmpty()) {
            for (NodeInfo node : addNodes) {
                this.registerContainer(node);
            }
        }
        if (!deleteContainers.isEmpty()) {
            for (Container container2 : deleteContainers) {
                this.unregisterContainer(container2);
            }
        }
    }

    private void registerContainer(NodeInfo nodeInfo) {
        LOG.info("Register container {}.", (Object)nodeInfo);
        Container container = new Container(nodeInfo.nodeId, nodeInfo.nodeAddress, nodeInfo.nodeHostname);
        container.setAvailableResource(nodeInfo.resources);
        Ray.setResource(container.getNodeId(), container.getName(), this.resources.getMaxActorNumPerContainer());
        Ray.setResource(container.getNodeId(), CONTAINER_ENGAGED_KEY, 1.0);
        this.resources.getRegisterContainers().add(container);
    }

    private void unregisterContainer(Container container) {
        LOG.info("Unregister container {}.", (Object)container);
        Ray.setResource(container.getNodeId(), container.getName(), 0.0);
        Ray.setResource(container.getNodeId(), CONTAINER_ENGAGED_KEY, 0.0);
        this.resources.getRegisterContainers().remove(container);
    }
}

