package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.class */
public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
    private static final int bufferSize = 1024;
    private static final byte[] dataBytes = new byte[1024];
    private static final int totalBytes = 1024;
    private static final int numThreads = 4;
    private static final int numSubpartitions = 10;
    private static final int numBuffersPerSubpartition = 10;
    private PartitionedFile partitionedFile;
    private BatchShuffleReadBufferPool bufferPool;
    private ExecutorService executor;
    private SortMergeResultPartitionReadScheduler readScheduler;

    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest$FakeBatchShuffleReadBufferPool.class */
    private static class FakeBatchShuffleReadBufferPool extends BatchShuffleReadBufferPool {
        private final Queue<MemorySegment> requestedBuffers;

        FakeBatchShuffleReadBufferPool(long j, int i) throws Exception {
            super(j, i);
            this.requestedBuffers = new LinkedList(requestBuffers());
        }

        public long getLastBufferOperationTimestamp() {
            recycle(this.requestedBuffers.poll());
            return super.getLastBufferOperationTimestamp();
        }

        public void destroy() {
            recycle(this.requestedBuffers);
            this.requestedBuffers.clear();
            super.destroy();
        }
    }

    @Before
    public void before() throws Exception {
        new Random().nextBytes(dataBytes);
        this.partitionedFile = PartitionTestUtils.createPartitionedFile(this.temporaryFolder.newFile().getAbsolutePath(), 10, 10, 1024, dataBytes);
        this.bufferPool = new BatchShuffleReadBufferPool(1024L, 1024);
        this.executor = Executors.newFixedThreadPool(4);
        this.readScheduler = new SortMergeResultPartitionReadScheduler(this.bufferPool, this.executor, this);
    }

    @After
    public void after() {
        this.partitionedFile.deleteQuietly();
        this.bufferPool.destroy();
        this.executor.shutdown();
    }

    @Test
    public void testCreateSubpartitionReader() throws Exception {
        SortMergeSubpartitionReader createSubpartitionReader = this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        Assert.assertTrue(this.readScheduler.isRunning());
        Assert.assertTrue(this.readScheduler.getDataFileChannel().isOpen());
        Assert.assertTrue(this.readScheduler.getIndexFileChannel().isOpen());
        int i = 0;
        while (i < 10) {
            ResultSubpartition.BufferAndBacklog nextBuffer = createSubpartitionReader.getNextBuffer();
            if (nextBuffer != null) {
                Buffer buffer = nextBuffer.buffer();
                Assert.assertEquals(ByteBuffer.wrap(dataBytes), buffer.getNioBufferReadable());
                buffer.recycleBuffer();
                i++;
            }
        }
    }

    @Test
    public void testOnSubpartitionReaderError() throws Exception {
        this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile).releaseAllResources();
        waitUntilReadFinish();
        assertAllResourcesReleased();
    }

    @Test
    public void testReleaseWhileReading() throws Exception {
        SortMergeSubpartitionReader createSubpartitionReader = this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        Thread.sleep(1000L);
        this.readScheduler.release();
        Assert.assertNotNull(createSubpartitionReader.getFailureCause());
        Assert.assertTrue(createSubpartitionReader.isReleased());
        Assert.assertEquals(0L, createSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
        Assert.assertTrue(createSubpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
        this.readScheduler.getReleaseFuture().get();
        assertAllResourcesReleased();
    }

    @Test(expected = IllegalStateException.class)
    public void testCreateSubpartitionReaderAfterReleased() throws Exception {
        this.readScheduler.release();
        try {
            this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        } finally {
            assertAllResourcesReleased();
        }
    }

    @Test
    public void testOnDataReadError() throws Exception {
        SortMergeSubpartitionReader createSubpartitionReader = this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        this.readScheduler.getDataFileChannel().close();
        while (!createSubpartitionReader.isReleased()) {
            ResultSubpartition.BufferAndBacklog nextBuffer = createSubpartitionReader.getNextBuffer();
            if (nextBuffer != null) {
                nextBuffer.buffer().recycleBuffer();
            }
        }
        waitUntilReadFinish();
        Assert.assertNotNull(createSubpartitionReader.getFailureCause());
        Assert.assertTrue(createSubpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
        assertAllResourcesReleased();
    }

    @Test
    public void testOnReadBufferRequestError() throws Exception {
        SortMergeSubpartitionReader createSubpartitionReader = this.readScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        this.bufferPool.destroy();
        waitUntilReadFinish();
        Assert.assertTrue(createSubpartitionReader.isReleased());
        Assert.assertNotNull(createSubpartitionReader.getFailureCause());
        Assert.assertTrue(createSubpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
        assertAllResourcesReleased();
    }

    @Test
    public void testRequestBufferTimeoutAndFailed() throws Exception {
        Duration ofSeconds = Duration.ofSeconds(3L);
        List requestBuffers = this.bufferPool.requestBuffers();
        SortMergeResultPartitionReadScheduler sortMergeResultPartitionReadScheduler = new SortMergeResultPartitionReadScheduler(this.bufferPool, this.executor, this, ofSeconds);
        SortMergeSubpartitionReader createSubpartitionReader = sortMergeResultPartitionReadScheduler.createSubpartitionReader(new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        PriorityQueue priorityQueue = new PriorityQueue();
        priorityQueue.add(createSubpartitionReader);
        long nanoTime = System.nanoTime();
        Queue allocateBuffers = sortMergeResultPartitionReadScheduler.allocateBuffers(priorityQueue);
        long nanoTime2 = System.nanoTime() - nanoTime;
        Assert.assertEquals(0L, allocateBuffers.size());
        Assert.assertTrue(nanoTime2 > ofSeconds.toNanos());
        assertExpectedTimeoutException(createSubpartitionReader.getFailureCause());
        this.bufferPool.recycle(requestBuffers);
        sortMergeResultPartitionReadScheduler.release();
    }

    @Test
    public void testRequestTimeoutIsRefreshedAndSuccess() throws Exception {
        Duration ofSeconds = Duration.ofSeconds(3L);
        FakeBatchShuffleReadBufferPool fakeBatchShuffleReadBufferPool = new FakeBatchShuffleReadBufferPool(3072L, 1024);
        SortMergeResultPartitionReadScheduler sortMergeResultPartitionReadScheduler = new SortMergeResultPartitionReadScheduler(fakeBatchShuffleReadBufferPool, this.executor, this, ofSeconds);
        FileChannel openFileChannel = openFileChannel(this.partitionedFile.getDataFilePath());
        FileChannel openFileChannel2 = openFileChannel(this.partitionedFile.getIndexFilePath());
        SortMergeSubpartitionReader sortMergeSubpartitionReader = new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), new PartitionedFileReader(this.partitionedFile, 0, openFileChannel, openFileChannel2));
        PriorityQueue priorityQueue = new PriorityQueue();
        priorityQueue.add(sortMergeSubpartitionReader);
        long nanoTime = System.nanoTime();
        Queue allocateBuffers = sortMergeResultPartitionReadScheduler.allocateBuffers(priorityQueue);
        long nanoTime2 = System.nanoTime() - nanoTime;
        Assert.assertEquals(3L, allocateBuffers.size());
        Assert.assertTrue(nanoTime2 > ofSeconds.toNanos() * 2);
        Assert.assertNull(sortMergeSubpartitionReader.getFailureCause());
        fakeBatchShuffleReadBufferPool.recycle(allocateBuffers);
        fakeBatchShuffleReadBufferPool.destroy();
        openFileChannel.close();
        openFileChannel2.close();
        sortMergeResultPartitionReadScheduler.release();
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    private static void assertExpectedTimeoutException(Throwable th) {
        Assert.assertNotNull(th);
        Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th, "Buffer request timeout").isPresent());
    }

    private void assertAllResourcesReleased() {
        Assert.assertNull(this.readScheduler.getDataFileChannel());
        Assert.assertNull(this.readScheduler.getIndexFileChannel());
        Assert.assertFalse(this.readScheduler.isRunning());
        Assert.assertEquals(0L, this.readScheduler.getNumPendingReaders());
        if (this.bufferPool.isDestroyed()) {
            return;
        }
        Assert.assertEquals(this.bufferPool.getNumTotalBuffers(), this.bufferPool.getAvailableBuffers());
    }

    private void waitUntilReadFinish() throws Exception {
        while (this.readScheduler.isRunning()) {
            Thread.sleep(100L);
        }
    }
}
