/*
 * Decompiled with CFR 0.152.
 */
package org.ray.streaming.runtime.master.scheduler.strategy.impl;

import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.streaming.runtime.config.types.SlotAssignStrategyType;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import org.ray.streaming.runtime.core.resource.Container;
import org.ray.streaming.runtime.core.resource.ContainerID;
import org.ray.streaming.runtime.core.resource.Resources;
import org.ray.streaming.runtime.core.resource.Slot;
import org.ray.streaming.runtime.master.scheduler.strategy.SlotAssignStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelineFirstStrategy
implements SlotAssignStrategy {
    public static final Logger LOG = LoggerFactory.getLogger(PipelineFirstStrategy.class);
    protected Resources resources;

    @Override
    public int getSlotNumPerContainer(List<Container> containers, int maxParallelism) {
        LOG.info("max parallelism: {}, container size: {}.", (Object)maxParallelism, (Object)containers.size());
        int slotNumPerContainer = (int)Math.ceil((double)Math.max(maxParallelism, containers.size()) * 1.0 / (double)containers.size());
        LOG.info("slot num per container: {}.", (Object)slotNumPerContainer);
        return slotNumPerContainer;
    }

    @Override
    public void allocateSlot(List<Container> containers, int slotNumPerContainer) {
        int maxSlotSize = containers.size() * slotNumPerContainer;
        LOG.info("Allocate slot, maxSlotSize: {}.", (Object)maxSlotSize);
        for (int slotId = 0; slotId < maxSlotSize; ++slotId) {
            Container targetContainer = containers.get(slotId % containers.size());
            Slot slot = new Slot(slotId, targetContainer.getContainerId());
            targetContainer.getSlots().add(slot);
        }
        containers.forEach(c -> {
            List<Slot> slots = c.getSlots();
            this.resources.getAllocatingMap().put(c.getContainerId(), slots);
        });
        LOG.info("Allocate slot result: {}.", (Object)this.resources.getAllocatingMap());
    }

    @Override
    public Map<ContainerID, List<Slot>> assignSlot(ExecutionGraph executionGraph) {
        LOG.info("Container available resources: {}.", (Object)this.resources.getAllAvailableResource());
        Map<Integer, ExecutionJobVertex> vertices = executionGraph.getExecutionJobVertexMap();
        HashMap vertexRemainingNum = new HashMap();
        vertices.forEach((k, v) -> {
            int size = v.getExecutionVertices().size();
            vertexRemainingNum.put(k, size);
        });
        int totalExecutionVerticesNum = vertexRemainingNum.values().stream().mapToInt(Integer::intValue).sum();
        int containerNum = this.resources.getRegisterContainers().size();
        this.resources.setActorPerContainer((int)Math.ceil((double)totalExecutionVerticesNum * 1.0 / (double)containerNum));
        LOG.info("Total execution vertices num: {}, container num: {}, capacity per container: {}.", totalExecutionVerticesNum, containerNum, this.resources.getActorPerContainer());
        int maxParallelism = executionGraph.getMaxParallelism();
        for (int i = 0; i < maxParallelism; ++i) {
            for (ExecutionJobVertex executionJobVertex : vertices.values()) {
                List<ExecutionVertex> exeVertices = executionJobVertex.getExecutionVertices();
                if (exeVertices.size() <= i) continue;
                ExecutionVertex executionVertex = exeVertices.get(i);
                this.checkResource(executionVertex.getResources());
                Container targetContainer = this.resources.getRegisterContainers().get(this.resources.getCurrentContainerIndex());
                List<Slot> targetSlots = targetContainer.getSlots();
                this.allocate(executionVertex, targetContainer, targetSlots.get(i % targetSlots.size()));
            }
        }
        return this.resources.getAllocatingMap();
    }

    private void checkResource(Map<String, Double> requiredResource) {
        int checkedNum = 0;
        while (!this.hasEnoughResource(requiredResource)) {
            this.resources.setCurrentContainerIndex((this.resources.getCurrentContainerIndex() + 1) % this.resources.getRegisterContainers().size());
            Preconditions.checkArgument(++checkedNum < this.resources.getRegisterContainers().size(), "No enough resource left, required resource: {}, available resource: {}.", requiredResource, this.resources.getAllAvailableResource());
            this.resources.setCurrentContainerAllocatedActorNum(0);
        }
    }

    private boolean hasEnoughResource(Map<String, Double> requiredResource) {
        long allocatedActorNum;
        LOG.info("Check resource for container, index: {}.", (Object)this.resources.getCurrentContainerIndex());
        if (null == requiredResource) {
            return true;
        }
        Container currentContainer = this.resources.getRegisterContainers().get(this.resources.getCurrentContainerIndex());
        List<Slot> slotActors = this.resources.getAllocatingMap().get(currentContainer.getContainerId());
        if (slotActors != null && slotActors.size() > 0 && (allocatedActorNum = slotActors.stream().map(Slot::getExecutionVertexIds).mapToLong(List::size).sum()) >= (long)this.resources.getActorPerContainer()) {
            LOG.info("Container remaining capacity is 0. used: {}, total: {}.", (Object)allocatedActorNum, (Object)this.resources.getActorPerContainer());
            return false;
        }
        Map<String, Double> availableResource = currentContainer.getAvailableResource();
        for (Map.Entry<String, Double> entry : requiredResource.entrySet()) {
            if (!availableResource.containsKey(entry.getKey()) || !(availableResource.get(entry.getKey()) < entry.getValue())) continue;
            LOG.warn("No enough resource for container {}. required: {}, available: {}.", currentContainer.getAddress(), requiredResource, availableResource);
            return false;
        }
        return true;
    }

    private void allocate(ExecutionVertex vertex, Container container, Slot slot) {
        LOG.info("Set slot {} to vertex {}.", (Object)slot, (Object)vertex);
        vertex.setSlotIfNotExist(slot);
        Slot useSlot = this.resources.getAllocatingMap().get(container.getContainerId()).stream().filter(s2 -> s2.getId() == slot.getId()).findFirst().get();
        useSlot.getExecutionVertexIds().add(vertex.getVertexId());
        this.resources.setCurrentContainerAllocatedActorNum(this.resources.getCurrentContainerAllocatedActorNum() + 1);
        if (this.resources.getCurrentContainerAllocatedActorNum() >= this.resources.getActorPerContainer()) {
            this.resources.setCurrentContainerIndex((this.resources.getCurrentContainerIndex() + 1) % this.resources.getRegisterContainers().size());
            this.resources.setCurrentContainerAllocatedActorNum(0);
        }
    }

    @Override
    public String getName() {
        return SlotAssignStrategyType.PIPELINE_FIRST_STRATEGY.getValue();
    }

    @Override
    public void setResources(Resources resources) {
        this.resources = resources;
    }
}

