package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.class */
public class IntermediateResultPartitionTest extends TestLogger {
    @Test
    public void testPipelinedPartitionConsumable() throws Exception {
        IntermediateResult createResult = createResult(ResultPartitionType.PIPELINED, 2);
        IntermediateResultPartition intermediateResultPartition = createResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = createResult.getPartitions()[1];
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        intermediateResultPartition.markDataProduced();
        Assert.assertTrue(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        createResult.resetForNewExecution();
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
    }

    @Test
    public void testBlockingPartitionConsumable() throws Exception {
        IntermediateResult createResult = createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition intermediateResultPartition = createResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = createResult.getPartitions()[1];
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup) intermediateResultPartition.getConsumedPartitionGroups().get(0);
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        Assert.assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
        intermediateResultPartition.markFinished();
        Assert.assertTrue(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        Assert.assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
        intermediateResultPartition2.markFinished();
        Assert.assertTrue(intermediateResultPartition.isConsumable());
        Assert.assertTrue(intermediateResultPartition2.isConsumable());
        Assert.assertTrue(consumedPartitionGroup.areAllPartitionsFinished());
        createResult.resetForNewExecution();
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        Assert.assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
    }

    @Test
    public void testBlockingPartitionResetting() throws Exception {
        IntermediateResult createResult = createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition intermediateResultPartition = createResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = createResult.getPartitions()[1];
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup) intermediateResultPartition.getConsumedPartitionGroups().get(0);
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        intermediateResultPartition.markFinished();
        Assert.assertEquals(1L, consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertTrue(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        Assert.assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
        createResult.resetForNewExecution();
        Assert.assertEquals(2L, consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        intermediateResultPartition2.markFinished();
        Assert.assertEquals(1L, consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertTrue(intermediateResultPartition2.isConsumable());
        Assert.assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
        intermediateResultPartition.markFinished();
        Assert.assertEquals(0L, consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertTrue(intermediateResultPartition.isConsumable());
        Assert.assertTrue(intermediateResultPartition2.isConsumable());
        Assert.assertTrue(consumedPartitionGroup.areAllPartitionsFinished());
        createResult.resetForNewExecution();
        Assert.assertEquals(2L, consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertFalse(intermediateResultPartition.isConsumable());
        Assert.assertFalse(intermediateResultPartition2.isConsumable());
        Assert.assertFalse(consumedPartitionGroup.areAllPartitionsFinished());
    }

    @Test
    public void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph() throws Exception {
        testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false, Arrays.asList(7, 7));
    }

    @Test
    public void testGetNumberOfSubpartitionsForNonDynamicPointwiseGraph() throws Exception {
        testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false, Arrays.asList(4, 3));
    }

    @Test
    public void testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph() throws Exception {
        testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true, Arrays.asList(7, 7));
    }

    @Test
    public void testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicPointwiseGraph() throws Exception {
        testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, true, Arrays.asList(4, 4));
    }

    @Test
    public void testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicAllToAllGraph() throws Exception {
        testGetNumberOfSubpartitions(-1, DistributionPattern.ALL_TO_ALL, true, Arrays.asList(13, 13));
    }

    @Test
    public void testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicPointwiseGraph() throws Exception {
        testGetNumberOfSubpartitions(-1, DistributionPattern.POINTWISE, true, Arrays.asList(7, 7));
    }

    private void testGetNumberOfSubpartitions(int i, DistributionPattern distributionPattern, boolean z, List<Integer> list) throws Exception {
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) createExecutionGraph(2, i, 13, distributionPattern, z).getVerticesTopologically().iterator().next();
        if (z) {
            ExecutionJobVertexTest.initializeVertex(executionJobVertex);
        }
        IntermediateResult intermediateResult = executionJobVertex.getProducedDataSets()[0];
        MatcherAssert.assertThat(Integer.valueOf(list.size()), CoreMatchers.is(2));
        MatcherAssert.assertThat(Arrays.stream(intermediateResult.getPartitions()).map((v0) -> {
            return v0.getNumberOfSubpartitions();
        }).collect(Collectors.toList()), Matchers.equalTo(list));
    }

    private static ExecutionGraph createExecutionGraph(int i, int i2, int i3, DistributionPattern distributionPattern, boolean z) throws Exception {
        JobVertex jobVertex = new JobVertex("v1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        JobVertex jobVertex2 = new JobVertex("v2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        if (i2 > 0) {
            jobVertex2.setParallelism(i2);
        }
        if (i3 > 0) {
            jobVertex2.setMaxParallelism(i3);
        }
        jobVertex2.connectNewDataSetAsInput(jobVertex, distributionPattern, ResultPartitionType.BLOCKING);
        JobGraph build = JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(Arrays.asList(jobVertex, jobVertex2)).build();
        TestingDefaultExecutionGraphBuilder vertexParallelismStore = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(build).setJobMasterConfig(new Configuration()).setVertexParallelismStore(computeVertexParallelismStoreConsideringDynamicGraph(build.getVertices(), z, i3));
        return z ? vertexParallelismStore.buildDynamicGraph() : vertexParallelismStore.build();
    }

    public static VertexParallelismStore computeVertexParallelismStoreConsideringDynamicGraph(Iterable<JobVertex> iterable, boolean z, int i) {
        return z ? ExecutionJobVertexTest.computeVertexParallelismStoreForDynamicGraph(iterable, i) : SchedulerBase.computeVertexParallelismStore(iterable);
    }

    private static IntermediateResult createResult(ResultPartitionType resultPartitionType, int i) throws Exception {
        JobVertex jobVertex = new JobVertex("v1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        JobVertex jobVertex2 = new JobVertex("v2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(i);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        DirectScheduledExecutorService directScheduledExecutorService = new DirectScheduledExecutorService();
        return SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.batchJobGraph(jobVertex, jobVertex2), ComponentMainThreadExecutorServiceAdapter.forMainThread()).setIoExecutor(directScheduledExecutorService).setFutureExecutor(directScheduledExecutorService).build().getExecutionJobVertex(jobVertex.getID()).getProducedDataSets()[0];
    }
}
