package org.apache.flink.test.scheduling;

import java.io.File;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.class */
public class AdaptiveSchedulerITCase extends TestLogger {

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static final int NUMBER_TASK_MANAGERS = 2;
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int PARALLELISM = 4;
    private static final Configuration configuration = getConfiguration();

    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_WITH_CLIENT_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/scheduling/AdaptiveSchedulerITCase$DummySource.class */
    public static final class DummySource extends RichParallelSourceFunction<Integer> implements CheckpointedFunction, CheckpointListener {
        private final StopWithSavepointTestBehavior behavior;
        private volatile boolean running = true;
        private static volatile CountDownLatch instancesRunning;

        public DummySource(StopWithSavepointTestBehavior stopWithSavepointTestBehavior) {
            this.behavior = stopWithSavepointTestBehavior;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void resetForParallelism(int i) {
            instancesRunning = new CountDownLatch(i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void awaitRunning() throws InterruptedException {
            Preconditions.checkNotNull(instancesRunning);
            instancesRunning.await();
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            Preconditions.checkNotNull(instancesRunning);
            instancesRunning.countDown();
            int i = Integer.MIN_VALUE;
            while (this.running) {
                Thread.sleep(10L);
                synchronized (sourceContext.getCheckpointLock()) {
                    int i2 = i;
                    i++;
                    sourceContext.collect(Integer.valueOf(i2));
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            if (this.behavior == StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT) {
                throw new RuntimeException(this.behavior.name());
            }
            if (this.behavior == StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY && functionSnapshotContext.getCheckpointId() == 1) {
                throw new RuntimeException(this.behavior.name());
            }
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (this.behavior == StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT_COMPLETE) {
                throw new RuntimeException(this.behavior.name());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/AdaptiveSchedulerITCase$SimpleSource.class */
    public static final class SimpleSource extends RichParallelSourceFunction<Integer> implements CheckpointListener, CheckpointedFunction {
        private static final ListStateDescriptor<Boolean> unionStateListDescriptor = new ListStateDescriptor<>("state", Boolean.class);
        private volatile boolean running = true;

        @Nullable
        private ListState<Boolean> unionListState = null;
        private boolean hasFailedBefore = false;
        private boolean fail = false;

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (this.running && !this.hasFailedBefore) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
                    Thread.sleep(5L);
                }
                if (this.fail) {
                    throw new FlinkException("Test failure.");
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            this.fail = true;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.unionListState = functionInitializationContext.getOperatorStateStore().getUnionListState(unionStateListDescriptor);
            Iterator it = ((Iterable) this.unionListState.get()).iterator();
            while (it.hasNext()) {
                this.hasFailedBefore |= ((Boolean) it.next()).booleanValue();
            }
            this.unionListState.clear();
            this.unionListState.add(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/scheduling/AdaptiveSchedulerITCase$StopWithSavepointTestBehavior.class */
    public enum StopWithSavepointTestBehavior {
        NO_FAILURE,
        FAIL_ON_CHECKPOINT,
        FAIL_ON_CHECKPOINT_COMPLETE,
        FAIL_ON_FIRST_CHECKPOINT_ONLY
    }

    private static Configuration getConfiguration() {
        Configuration configuration2 = new Configuration();
        configuration2.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
        return configuration2;
    }

    @Before
    public void ensureAdaptiveSchedulerEnabled() {
        Assume.assumeTrue(ClusterOptions.isAdaptiveSchedulerEnabled(configuration));
    }

    @After
    public void cancelRunningJobs() {
        MINI_CLUSTER_WITH_CLIENT_RESOURCE.cancelAllJobs();
    }

    @Test
    public void testGlobalFailoverCanRecoverState() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.enableCheckpointing(20L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.addSource(new SimpleSource()).addSink(new DiscardingSink());
        executionEnvironment.execute();
    }

    @Test
    public void testStopWithSavepointNoError() throws Exception {
        StreamExecutionEnvironment envWithSource = getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE);
        DummySource.resetForParallelism(PARALLELISM);
        JobClient executeAsync = envWithSource.executeAsync();
        DummySource.awaitRunning();
        File newFolder = this.tempFolder.newFolder("savepoint");
        Assert.assertThat((String) executeAsync.stopWithSavepoint(false, newFolder.getAbsolutePath()).get(), CoreMatchers.containsString(newFolder.getAbsolutePath()));
        Assert.assertThat(executeAsync.getJobStatus().get(), CoreMatchers.is(JobStatus.FINISHED));
    }

    @Test
    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
        StreamExecutionEnvironment envWithSource = getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
        envWithSource.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
        DummySource.resetForParallelism(PARALLELISM);
        JobClient executeAsync = envWithSource.executeAsync();
        DummySource.awaitRunning();
        try {
            executeAsync.stopWithSavepoint(false, this.tempFolder.newFolder("savepoint").getAbsolutePath()).get();
            Assert.fail("Expect exception");
        } catch (ExecutionException e) {
            Assert.assertThat(e, FlinkMatchers.containsCause(FlinkException.class));
        }
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(executeAsync.getJobStatus().get() == JobStatus.RUNNING);
        }, Deadline.fromNow(Duration.of(1L, ChronoUnit.MINUTES)));
    }

    @Test
    public void testStopWithSavepointFailOnStop() throws Exception {
        StreamExecutionEnvironment envWithSource = getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT_COMPLETE);
        envWithSource.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
        DummySource.resetForParallelism(PARALLELISM);
        JobClient executeAsync = envWithSource.executeAsync();
        DummySource.awaitRunning();
        try {
            executeAsync.stopWithSavepoint(false, this.tempFolder.newFolder("savepoint").getAbsolutePath()).get();
            Assert.fail("Expect exception");
        } catch (ExecutionException e) {
            Assert.assertThat(e, FlinkMatchers.containsCause(FlinkException.class));
        }
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(executeAsync.getJobStatus().get() == JobStatus.RUNNING);
        }, Deadline.fromNow(Duration.of(1L, ChronoUnit.MINUTES)));
    }

    @Test
    public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.addSource(new DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY)).addSink(new DiscardingSink());
        DummySource.resetForParallelism(PARALLELISM);
        JobClient executeAsync = executionEnvironment.executeAsync();
        DummySource.awaitRunning();
        DummySource.resetForParallelism(PARALLELISM);
        File newFolder = this.tempFolder.newFolder("savepoint");
        try {
            executeAsync.stopWithSavepoint(false, newFolder.getAbsolutePath()).get();
            Assert.fail("Expect failure of operation");
        } catch (ExecutionException e) {
            Assert.assertThat(e, FlinkMatchers.containsCause(FlinkException.class));
        }
        DummySource.awaitRunning();
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(isDirectoryEmpty(newFolder));
        }, Deadline.fromNow(Duration.ofSeconds(10L)));
        Assert.assertThat((String) executeAsync.stopWithSavepoint(false, newFolder.getAbsolutePath()).get(), CoreMatchers.containsString(newFolder.getAbsolutePath()));
    }

    private boolean isDirectoryEmpty(File file) {
        File[] listFiles = file.listFiles();
        if (listFiles.length <= 0) {
            return true;
        }
        this.log.warn("There are still unexpected files: {}", Arrays.stream(listFiles).map((v0) -> {
            return v0.getAbsolutePath();
        }).collect(Collectors.joining(", ")));
        return false;
    }

    private static StreamExecutionEnvironment getEnvWithSource(StopWithSavepointTestBehavior stopWithSavepointTestBehavior) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.addSource(new DummySource(stopWithSavepointTestBehavior)).addSink(new DiscardingSink());
        return executionEnvironment;
    }
}
