package org.apache.flink.runtime.taskmanager;

import java.lang.Thread;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.MiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.class */
public class TaskCancelAsyncProducerConsumerITCase {
    private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
    private static volatile Exception ASYNC_CONSUMER_EXCEPTION;
    private static volatile Thread ASYNC_PRODUCER_THREAD;
    private static volatile Thread ASYNC_CONSUMER_THREAD;
    public static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setConfiguration(getFlinkConfiguration()).build());

    @RegisterExtension
    public static AllCallbackWrapper allCallbackWrapper = new AllCallbackWrapper(MINI_CLUSTER_RESOURCE);

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase$AsyncConsumer.class */
    public static class AsyncConsumer extends AbstractInvokable {

        /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase$AsyncConsumer$ConsumerThread.class */
        private static class ConsumerThread extends Thread {
            private final InputGate inputGate;

            public ConsumerThread(InputGate inputGate) {
                this.inputGate = inputGate;
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        this.inputGate.getNext();
                    } catch (Exception e) {
                        Exception unused = TaskCancelAsyncProducerConsumerITCase.ASYNC_CONSUMER_EXCEPTION = e;
                        return;
                    }
                }
            }
        }

        public AsyncConsumer(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            ConsumerThread consumerThread = new ConsumerThread(getEnvironment().getInputGate(0));
            Thread unused = TaskCancelAsyncProducerConsumerITCase.ASYNC_CONSUMER_THREAD = consumerThread;
            consumerThread.start();
            while (consumerThread.isAlive()) {
                try {
                    consumerThread.join();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase$AsyncProducer.class */
    public static class AsyncProducer extends AbstractInvokable {

        /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase$AsyncProducer$ProducerThread.class */
        private static class ProducerThread extends Thread {
            private final RecordWriter<LongValue> recordWriter;

            public ProducerThread(ResultPartitionWriter resultPartitionWriter) {
                this.recordWriter = new RecordWriterBuilder().build(resultPartitionWriter);
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                LongValue longValue = new LongValue(0L);
                while (true) {
                    try {
                        longValue.setValue(longValue.getValue() + 1);
                        this.recordWriter.emit(longValue);
                        this.recordWriter.flushAll();
                    } catch (Exception e) {
                        Exception unused = TaskCancelAsyncProducerConsumerITCase.ASYNC_PRODUCER_EXCEPTION = e;
                        return;
                    }
                }
            }
        }

        public AsyncProducer(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            ProducerThread producerThread = new ProducerThread(getEnvironment().getWriter(0));
            Thread unused = TaskCancelAsyncProducerConsumerITCase.ASYNC_PRODUCER_THREAD = producerThread;
            producerThread.start();
            while (producerThread.isAlive()) {
                try {
                    producerThread.join();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    private static Configuration getFlinkConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096"));
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 9);
        return configuration;
    }

    @Test
    public void testCancelAsyncProducerAndConsumer() throws Exception {
        Deadline plus = Deadline.now().plus(Duration.ofMinutes(2L));
        JobVertex jobVertex = new JobVertex("AsyncProducer");
        jobVertex.setParallelism(1);
        jobVertex.setInvokableClass(AsyncProducer.class);
        JobVertex jobVertex2 = new JobVertex("AsyncConsumer");
        jobVertex2.setParallelism(1);
        jobVertex2.setInvokableClass(AsyncConsumer.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2);
        MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        miniCluster.runDetached(streamingJobGraph);
        FutureUtils.retrySuccessfulWithDelay(() -> {
            return miniCluster.getJobStatus(streamingJobGraph.getJobID());
        }, Time.milliseconds(10L), plus, jobStatus -> {
            return jobStatus == JobStatus.RUNNING;
        }, TestingUtils.defaultScheduledExecutor()).get(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        boolean z = false;
        for (int i = 0; i < 50; i++) {
            Thread thread = ASYNC_PRODUCER_THREAD;
            if (thread != null && thread.isAlive()) {
                z = LocalBufferPoolDestroyTest.isInBlockingBufferRequest(thread.getStackTrace());
            }
            if (z) {
                break;
            }
            Thread.sleep(500L);
        }
        Assert.assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace()), z);
        boolean z2 = false;
        for (int i2 = 0; i2 < 50; i2++) {
            Thread thread2 = ASYNC_CONSUMER_THREAD;
            if (thread2 != null && thread2.isAlive()) {
                z2 = thread2.getState() == Thread.State.WAITING;
            }
            if (z2) {
                break;
            }
            Thread.sleep(500L);
        }
        Assert.assertTrue("Consumer thread is not blocked.", z2);
        miniCluster.cancelJob(streamingJobGraph.getJobID()).get(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        FutureUtils.retrySuccessfulWithDelay(() -> {
            return miniCluster.getJobStatus(streamingJobGraph.getJobID());
        }, Time.milliseconds(10L), plus, jobStatus2 -> {
            return jobStatus2 == JobStatus.CANCELED;
        }, TestingUtils.defaultScheduledExecutor()).get(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        Assert.assertNotNull(ASYNC_PRODUCER_EXCEPTION);
        Assert.assertEquals(CancelTaskException.class, ASYNC_PRODUCER_EXCEPTION.getClass());
        Assert.assertNotNull(ASYNC_CONSUMER_EXCEPTION);
        Assert.assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
    }
}
