package org.apache.flink.runtime.scheduler.adapter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.EdgeManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.failover.flip1.SchedulingPipelinedRegionComputeUtil;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.LogicalEdge;
import org.apache.flink.runtime.jobgraph.topology.LogicalVertex;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.class */
public class DefaultExecutionTopology implements SchedulingTopology {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutionTopology.class);
    private final Map<ExecutionVertexID, DefaultExecutionVertex> executionVerticesById;
    private final List<DefaultExecutionVertex> executionVerticesList;
    private final Map<IntermediateResultPartitionID, DefaultResultPartition> resultPartitionsById;
    private final Map<ExecutionVertexID, DefaultSchedulingPipelinedRegion> pipelinedRegionsByVertex;
    private final List<DefaultSchedulingPipelinedRegion> pipelinedRegions;
    private final EdgeManager edgeManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology$ExecutionGraphIndex.class */
    public static class ExecutionGraphIndex {
        private final Map<ExecutionVertexID, DefaultExecutionVertex> executionVerticesById;
        private final List<DefaultExecutionVertex> executionVerticesList;
        private final Map<DefaultLogicalPipelinedRegion, List<DefaultExecutionVertex>> sortedExecutionVerticesInPipelinedRegion;
        private final Map<IntermediateResultPartitionID, DefaultResultPartition> resultPartitionsById;

        private ExecutionGraphIndex(Map<ExecutionVertexID, DefaultExecutionVertex> map, List<DefaultExecutionVertex> list, Map<DefaultLogicalPipelinedRegion, List<DefaultExecutionVertex>> map2, Map<IntermediateResultPartitionID, DefaultResultPartition> map3) {
            this.executionVerticesById = (Map) Preconditions.checkNotNull(map);
            this.executionVerticesList = (List) Preconditions.checkNotNull(list);
            this.sortedExecutionVerticesInPipelinedRegion = (Map) Preconditions.checkNotNull(map2);
            this.resultPartitionsById = (Map) Preconditions.checkNotNull(map3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology$IndexedPipelinedRegions.class */
    public static class IndexedPipelinedRegions {
        private final Map<ExecutionVertexID, DefaultSchedulingPipelinedRegion> pipelinedRegionsByVertex;
        private final List<DefaultSchedulingPipelinedRegion> pipelinedRegions;

        private IndexedPipelinedRegions(Map<ExecutionVertexID, DefaultSchedulingPipelinedRegion> map, List<DefaultSchedulingPipelinedRegion> list) {
            this.pipelinedRegionsByVertex = (Map) Preconditions.checkNotNull(map);
            this.pipelinedRegions = (List) Preconditions.checkNotNull(list);
        }
    }

    private DefaultExecutionTopology(Map<ExecutionVertexID, DefaultExecutionVertex> map, List<DefaultExecutionVertex> list, Map<IntermediateResultPartitionID, DefaultResultPartition> map2, Map<ExecutionVertexID, DefaultSchedulingPipelinedRegion> map3, List<DefaultSchedulingPipelinedRegion> list2, EdgeManager edgeManager) {
        this.executionVerticesById = (Map) Preconditions.checkNotNull(map);
        this.executionVerticesList = (List) Preconditions.checkNotNull(list);
        this.resultPartitionsById = (Map) Preconditions.checkNotNull(map2);
        this.pipelinedRegionsByVertex = (Map) Preconditions.checkNotNull(map3);
        this.pipelinedRegions = (List) Preconditions.checkNotNull(list2);
        this.edgeManager = edgeManager;
    }

    @Override // org.apache.flink.runtime.topology.BaseTopology
    public Iterable<DefaultExecutionVertex> getVertices() {
        return Collections.unmodifiableList(this.executionVerticesList);
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingTopology
    public DefaultExecutionVertex getVertex(ExecutionVertexID executionVertexID) {
        DefaultExecutionVertex defaultExecutionVertex = this.executionVerticesById.get(executionVertexID);
        if (defaultExecutionVertex == null) {
            throw new IllegalArgumentException("can not find vertex: " + executionVertexID);
        }
        return defaultExecutionVertex;
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingTopology
    public DefaultResultPartition getResultPartition(IntermediateResultPartitionID intermediateResultPartitionID) {
        DefaultResultPartition defaultResultPartition = this.resultPartitionsById.get(intermediateResultPartitionID);
        if (defaultResultPartition == null) {
            throw new IllegalArgumentException("can not find partition: " + intermediateResultPartitionID);
        }
        return defaultResultPartition;
    }

    @Override // org.apache.flink.runtime.topology.Topology
    public Iterable<? extends SchedulingPipelinedRegion> getAllPipelinedRegions() {
        Preconditions.checkNotNull(this.pipelinedRegions);
        return Collections.unmodifiableCollection(this.pipelinedRegions);
    }

    @Override // org.apache.flink.runtime.topology.Topology
    public DefaultSchedulingPipelinedRegion getPipelinedRegionOfVertex(ExecutionVertexID executionVertexID) {
        Preconditions.checkNotNull(this.pipelinedRegionsByVertex);
        DefaultSchedulingPipelinedRegion defaultSchedulingPipelinedRegion = this.pipelinedRegionsByVertex.get(executionVertexID);
        if (defaultSchedulingPipelinedRegion == null) {
            throw new IllegalArgumentException("Unknown execution vertex " + executionVertexID);
        }
        return defaultSchedulingPipelinedRegion;
    }

    public EdgeManager getEdgeManager() {
        return this.edgeManager;
    }

    public static DefaultExecutionTopology fromExecutionGraph(DefaultExecutionGraph defaultExecutionGraph) {
        Preconditions.checkNotNull(defaultExecutionGraph, "execution graph can not be null");
        EdgeManager edgeManager = defaultExecutionGraph.getEdgeManager();
        ExecutionGraphIndex computeExecutionGraphIndex = computeExecutionGraphIndex(defaultExecutionGraph.getAllExecutionVertices(), DefaultLogicalTopology.fromTopologicallySortedJobVertices((List) IterableUtils.toStream(defaultExecutionGraph.getVerticesTopologically()).map((v0) -> {
            return v0.getJobVertex();
        }).collect(Collectors.toList())).getAllPipelinedRegions(), edgeManager);
        Set keySet = computeExecutionGraphIndex.sortedExecutionVerticesInPipelinedRegion.keySet();
        Map map = computeExecutionGraphIndex.sortedExecutionVerticesInPipelinedRegion;
        map.getClass();
        Function function = (v1) -> {
            return r1.get(v1);
        };
        Map map2 = computeExecutionGraphIndex.executionVerticesById;
        map2.getClass();
        Function function2 = (v1) -> {
            return r2.get(v1);
        };
        Map map3 = computeExecutionGraphIndex.resultPartitionsById;
        map3.getClass();
        IndexedPipelinedRegions computePipelinedRegions = computePipelinedRegions(keySet, function, function2, (v1) -> {
            return r3.get(v1);
        });
        ensureCoLocatedVerticesInSameRegion(computePipelinedRegions.pipelinedRegions, defaultExecutionGraph);
        return new DefaultExecutionTopology(computeExecutionGraphIndex.executionVerticesById, computeExecutionGraphIndex.executionVerticesList, computeExecutionGraphIndex.resultPartitionsById, computePipelinedRegions.pipelinedRegionsByVertex, computePipelinedRegions.pipelinedRegions, edgeManager);
    }

    private static ExecutionGraphIndex computeExecutionGraphIndex(Iterable<ExecutionVertex> iterable, Iterable<DefaultLogicalPipelinedRegion> iterable2, EdgeManager edgeManager) {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        HashMap hashMap2 = new HashMap();
        IdentityHashMap identityHashMap = new IdentityHashMap();
        HashMap hashMap3 = new HashMap();
        for (DefaultLogicalPipelinedRegion defaultLogicalPipelinedRegion : iterable2) {
            Iterator<? extends LogicalVertex> it = defaultLogicalPipelinedRegion.getVertices().iterator();
            while (it.hasNext()) {
                hashMap3.put(it.next().getId(), defaultLogicalPipelinedRegion);
            }
        }
        for (ExecutionVertex executionVertex : iterable) {
            Map<IntermediateResultPartitionID, IntermediateResultPartition> producedPartitions = executionVertex.getProducedPartitions();
            edgeManager.getClass();
            List<DefaultResultPartition> generateProducedSchedulingResultPartition = generateProducedSchedulingResultPartition(producedPartitions, edgeManager::getConsumerVertexGroupForPartition);
            generateProducedSchedulingResultPartition.forEach(defaultResultPartition -> {
            });
            List<ConsumedPartitionGroup> consumedPartitionGroupsForVertex = edgeManager.getConsumedPartitionGroupsForVertex(executionVertex.getID());
            hashMap2.getClass();
            DefaultExecutionVertex generateSchedulingExecutionVertex = generateSchedulingExecutionVertex(executionVertex, generateProducedSchedulingResultPartition, consumedPartitionGroupsForVertex, (v1) -> {
                return r3.get(v1);
            });
            hashMap.put(generateSchedulingExecutionVertex.getId(), generateSchedulingExecutionVertex);
            ((List) identityHashMap.computeIfAbsent(hashMap3.get(generateSchedulingExecutionVertex.getId().getJobVertexId()), defaultLogicalPipelinedRegion2 -> {
                return new ArrayList();
            })).add(generateSchedulingExecutionVertex);
            arrayList.add(generateSchedulingExecutionVertex);
        }
        return new ExecutionGraphIndex(hashMap, arrayList, identityHashMap, hashMap2);
    }

    private static List<DefaultResultPartition> generateProducedSchedulingResultPartition(Map<IntermediateResultPartitionID, IntermediateResultPartition> map, Function<IntermediateResultPartitionID, ConsumerVertexGroup> function) {
        ArrayList arrayList = new ArrayList(map.size());
        map.values().forEach(intermediateResultPartition -> {
            IntermediateResultPartitionID partitionId = intermediateResultPartition.getPartitionId();
            IntermediateDataSetID id = intermediateResultPartition.getIntermediateResult().getId();
            ResultPartitionType resultType = intermediateResultPartition.getResultType();
            Supplier supplier = () -> {
                return intermediateResultPartition.isConsumable() ? ResultPartitionState.CONSUMABLE : ResultPartitionState.CREATED;
            };
            ConsumerVertexGroup consumerVertexGroup = (ConsumerVertexGroup) function.apply(intermediateResultPartition.getPartitionId());
            intermediateResultPartition.getClass();
            arrayList.add(new DefaultResultPartition(partitionId, id, resultType, supplier, consumerVertexGroup, intermediateResultPartition::getConsumedPartitionGroups));
        });
        return arrayList;
    }

    private static DefaultExecutionVertex generateSchedulingExecutionVertex(ExecutionVertex executionVertex, List<DefaultResultPartition> list, List<ConsumedPartitionGroup> list2, Function<IntermediateResultPartitionID, DefaultResultPartition> function) {
        ExecutionVertexID id = executionVertex.getID();
        executionVertex.getClass();
        DefaultExecutionVertex defaultExecutionVertex = new DefaultExecutionVertex(id, list, executionVertex::getExecutionState, list2, function);
        list.forEach(defaultResultPartition -> {
            defaultResultPartition.setProducer(defaultExecutionVertex);
        });
        return defaultExecutionVertex;
    }

    private static IndexedPipelinedRegions computePipelinedRegions(Iterable<DefaultLogicalPipelinedRegion> iterable, Function<DefaultLogicalPipelinedRegion, List<DefaultExecutionVertex>> function, Function<ExecutionVertexID, DefaultExecutionVertex> function2, Function<IntermediateResultPartitionID, DefaultResultPartition> function3) {
        long nanoTime = System.nanoTime();
        Set<Set> newSetFromMap = Collections.newSetFromMap(new IdentityHashMap());
        for (DefaultLogicalPipelinedRegion defaultLogicalPipelinedRegion : iterable) {
            List<DefaultExecutionVertex> apply = function.apply(defaultLogicalPipelinedRegion);
            if (containsIntraRegionAllToAllEdge(defaultLogicalPipelinedRegion)) {
                newSetFromMap.add(new HashSet(apply));
            } else {
                newSetFromMap.addAll(SchedulingPipelinedRegionComputeUtil.computePipelinedRegions(apply, function2, function3));
            }
        }
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (Set set : newSetFromMap) {
            DefaultSchedulingPipelinedRegion defaultSchedulingPipelinedRegion = new DefaultSchedulingPipelinedRegion(set, function3);
            arrayList.add(defaultSchedulingPipelinedRegion);
            Iterator it = set.iterator();
            while (it.hasNext()) {
                hashMap.put(((SchedulingExecutionVertex) it.next()).getId(), defaultSchedulingPipelinedRegion);
            }
        }
        LOG.info("Built {} pipelined regions in {} ms", Integer.valueOf(arrayList.size()), Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
        return new IndexedPipelinedRegions(hashMap, arrayList);
    }

    private static boolean containsIntraRegionAllToAllEdge(DefaultLogicalPipelinedRegion defaultLogicalPipelinedRegion) {
        Iterator<? extends LogicalVertex> it = defaultLogicalPipelinedRegion.getVertices().iterator();
        while (it.hasNext()) {
            for (LogicalEdge logicalEdge : it.next().getInputs()) {
                if (logicalEdge.getDistributionPattern() == DistributionPattern.ALL_TO_ALL && defaultLogicalPipelinedRegion.contains(logicalEdge.getProducerVertexId())) {
                    return true;
                }
            }
        }
        return false;
    }

    private static void ensureCoLocatedVerticesInSameRegion(List<DefaultSchedulingPipelinedRegion> list, ExecutionGraph executionGraph) {
        HashMap hashMap = new HashMap();
        Iterator<DefaultSchedulingPipelinedRegion> it = list.iterator();
        while (it.hasNext()) {
            DefaultSchedulingPipelinedRegion next = it.next();
            Iterator<? extends SchedulingExecutionVertex> it2 = next.getVertices().iterator();
            while (it2.hasNext()) {
                CoLocationConstraint coLocationConstraint = getCoLocationConstraint(((DefaultExecutionVertex) it2.next()).getId(), executionGraph);
                if (coLocationConstraint != null) {
                    DefaultSchedulingPipelinedRegion defaultSchedulingPipelinedRegion = (DefaultSchedulingPipelinedRegion) hashMap.get(coLocationConstraint);
                    Preconditions.checkState(defaultSchedulingPipelinedRegion == null || defaultSchedulingPipelinedRegion == next, "co-located tasks must be in the same pipelined region");
                    hashMap.putIfAbsent(coLocationConstraint, next);
                }
            }
        }
    }

    private static CoLocationConstraint getCoLocationConstraint(ExecutionVertexID executionVertexID, ExecutionGraph executionGraph) {
        CoLocationGroup coLocationGroup = ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(executionVertexID.getJobVertexId()))).getCoLocationGroup();
        if (coLocationGroup == null) {
            return null;
        }
        return coLocationGroup.getLocationConstraint(executionVertexID.getSubtaskIndex());
    }
}
