package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.class */
public class AdaptiveSchedulerClusterITCase extends TestLogger {
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int NUMBER_TASK_MANAGERS = 2;
    private static final int PARALLELISM = 4;
    private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
    private final Configuration configuration = createConfiguration();

    @Rule
    public final MiniClusterResource miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.configuration).setNumberSlotsPerTaskManager(2).setNumberTaskManagers(2).build());

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase$CheckpointingNoOpInvokable.class */
    public static class CheckpointingNoOpInvokable extends AbstractInvokable {
        private static final long CANCEL_SIGNAL = -2;
        private final BlockingQueue<Long> checkpointsToConfirm;

        public CheckpointingNoOpInvokable(Environment environment) {
            super(environment);
            this.checkpointsToConfirm = new ArrayBlockingQueue(1);
        }

        public void invoke() throws Exception {
            long longValue = this.checkpointsToConfirm.take().longValue();
            while (true) {
                long j = longValue;
                if (j == CANCEL_SIGNAL) {
                    return;
                }
                getEnvironment().acknowledgeCheckpoint(j, new CheckpointMetrics());
                longValue = this.checkpointsToConfirm.take().longValue();
            }
        }

        public void cancel() throws Exception {
            this.checkpointsToConfirm.add(Long.valueOf(CANCEL_SIGNAL));
        }

        public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            this.checkpointsToConfirm.add(Long.valueOf(checkpointMetaData.getCheckpointId()));
            return CompletableFuture.completedFuture(true);
        }

        public Future<Void> notifyCheckpointCompleteAsync(long j) {
            return CompletableFuture.completedFuture(null);
        }
    }

    private Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
        configuration.set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(100L));
        configuration.set(WebOptions.CHECKPOINTS_HISTORY_SIZE, Integer.MAX_VALUE);
        return configuration;
    }

    @Before
    public void setUp() {
        OnceBlockingNoOpInvokable.reset();
    }

    @Test
    public void testAutomaticScaleDownInCaseOfLostSlots() throws Exception {
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        JobGraph createBlockingJobGraph = createBlockingJobGraph(4);
        miniCluster.submitJob(createBlockingJobGraph).join();
        CompletableFuture requestJobResult = miniCluster.requestJobResult(createBlockingJobGraph.getJobID());
        waitUntilParallelismForVertexReached(createBlockingJobGraph.getJobID(), JOB_VERTEX_ID, 4);
        miniCluster.terminateTaskManager(0);
        waitUntilParallelismForVertexReached(createBlockingJobGraph.getJobID(), JOB_VERTEX_ID, 2);
        OnceBlockingNoOpInvokable.unblock();
        Assert.assertTrue(((JobResult) requestJobResult.join()).isSuccess());
    }

    @Test
    public void testAutomaticScaleUp() throws Exception {
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        int i = 4 + 2;
        JobGraph createBlockingJobGraph = createBlockingJobGraph(i);
        this.log.info("Submitting job with parallelism of " + i + ", to a cluster with only one TM.");
        miniCluster.submitJob(createBlockingJobGraph).join();
        CompletableFuture requestJobResult = miniCluster.requestJobResult(createBlockingJobGraph.getJobID());
        waitUntilParallelismForVertexReached(createBlockingJobGraph.getJobID(), JOB_VERTEX_ID, 4);
        this.log.info("Start additional TaskManager to scale up to the full parallelism.");
        miniCluster.startTaskManager();
        this.log.info("Waiting until Invokable is running with higher parallelism");
        waitUntilParallelismForVertexReached(createBlockingJobGraph.getJobID(), JOB_VERTEX_ID, i);
        OnceBlockingNoOpInvokable.unblock();
        Assert.assertTrue(((JobResult) requestJobResult.join()).isSuccess());
    }

    @Test
    public void testCheckpointStatsPersistedAcrossRescale() throws Exception {
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        JobVertex jobVertex = new JobVertex("jobVertex", JOB_VERTEX_ID);
        jobVertex.setInvokableClass(CheckpointingNoOpInvokable.class);
        jobVertex.setParallelism(4);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
        streamingJobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(100L).setCheckpointTimeout(1000L).build(), (SerializedValue) null));
        miniCluster.submitJob(streamingJobGraph).join();
        CommonTestUtils.waitUntilCondition(() -> {
            return (Boolean) miniCluster.getExecutionGraph(streamingJobGraph.getJobID()).thenApply(accessExecutionGraph -> {
                return Boolean.valueOf(accessExecutionGraph.getCheckpointStatsSnapshot().getCounts().getNumberOfCompletedCheckpoints() > 0);
            }).get();
        }, Deadline.fromNow(Duration.ofHours(1L)));
        miniCluster.terminateTaskManager(0);
        waitUntilParallelismForVertexReached(streamingJobGraph.getJobID(), JOB_VERTEX_ID, 2);
        List list = (List) miniCluster.getExecutionGraph(streamingJobGraph.getJobID()).thenApply(accessExecutionGraph -> {
            return accessExecutionGraph.getCheckpointStatsSnapshot().getHistory().getCheckpoints();
        }).get();
        MatcherAssert.assertThat(Long.valueOf(((AbstractCheckpointStats) list.get(list.size() - 1)).getCheckpointId()), Is.is(1L));
    }

    private JobGraph createBlockingJobGraph(int i) throws IOException {
        JobVertex jobVertex = new JobVertex("Blocking operator", JOB_VERTEX_ID);
        jobVertex.setInvokableClass(OnceBlockingNoOpInvokable.class);
        jobVertex.setParallelism(i);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        streamingJobGraph.setExecutionConfig(executionConfig);
        return streamingJobGraph;
    }

    private void waitUntilParallelismForVertexReached(JobID jobID, JobVertexID jobVertexID, int i) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            ArchivedExecutionGraph archivedExecutionGraph = (ArchivedExecutionGraph) this.miniClusterResource.getMiniCluster().getArchivedExecutionGraph(jobID).get();
            if (archivedExecutionGraph.getState() == JobStatus.INITIALIZING) {
                return false;
            }
            return Boolean.valueOf(((AccessExecutionJobVertex) archivedExecutionGraph.getAllVertices().get(jobVertexID)).getParallelism() == i);
        }, Deadline.fromNow(Duration.ofSeconds(10L)));
    }
}
