package org.apache.flink.runtime.jobmaster.slotpool;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeTest.class */
public class DeclarativeSlotPoolBridgeTest extends TestLogger {
    private static final Time rpcTimeout = Time.seconds(20);
    private static final JobID jobId = new JobID();
    private static final JobMasterId jobMasterId = JobMasterId.generate();
    private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
    private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;

    @Parameterized.Parameters(name = "RequestSlotMatchingStrategy: {0}")
    public static Collection<RequestSlotMatchingStrategy> data() throws IOException {
        return Arrays.asList(SimpleRequestSlotMatchingStrategy.INSTANCE, PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
    }

    public DeclarativeSlotPoolBridgeTest(RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
        this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
    }

    @Test
    public void testSlotOffer() throws Exception {
        SlotRequestId slotRequestId = new SlotRequestId();
        PhysicalSlot createAllocatedSlot = createAllocatedSlot(new AllocationID());
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder()), this.requestSlotMatchingStrategy);
        Throwable th = null;
        try {
            try {
                createDeclarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
                CompletableFuture requestNewAllocatedSlot = createDeclarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, (Time) null);
                createDeclarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(createAllocatedSlot));
                requestNewAllocatedSlot.join();
                if (createDeclarativeSlotPoolBridge != null) {
                    if (0 == 0) {
                        createDeclarativeSlotPoolBridge.close();
                        return;
                    }
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (th != null) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNotEnoughResourcesAvailableFailsPendingRequests() throws Exception {
        SlotRequestId slotRequestId = new SlotRequestId();
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder()), this.requestSlotMatchingStrategy);
        Throwable th = null;
        try {
            try {
                createDeclarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
                CompletableFuture completableFuture = (CompletableFuture) CompletableFuture.supplyAsync(() -> {
                    return createDeclarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, Time.minutes(5L));
                }, this.mainThreadExecutor).get();
                this.mainThreadExecutor.execute(() -> {
                    createDeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(Collections.emptyList());
                });
                MatcherAssert.assertThat(completableFuture, FlinkMatchers.futureWillCompleteExceptionally(NoResourceAvailableException.class, Duration.ofSeconds(10L)));
                if (createDeclarativeSlotPoolBridge != null) {
                    if (0 == 0) {
                        createDeclarativeSlotPoolBridge.close();
                        return;
                    }
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (th != null) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testReleasingAllocatedSlot() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        AllocationID allocationID = new AllocationID();
        PhysicalSlot createAllocatedSlot = createAllocatedSlot(allocationID);
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder().setReserveFreeSlotFunction((allocationID2, resourceProfile) -> {
            MatcherAssert.assertThat(allocationID2, CoreMatchers.is(allocationID));
            return createAllocatedSlot;
        }).setFreeReservedSlotFunction((allocationID3, th, l) -> {
            completableFuture.complete(allocationID3);
            return ResourceCounter.empty();
        })), this.requestSlotMatchingStrategy);
        Throwable th2 = null;
        try {
            try {
                createDeclarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
                SlotRequestId slotRequestId = new SlotRequestId();
                createDeclarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, allocationID, createAllocatedSlot.getResourceProfile());
                createDeclarativeSlotPoolBridge.releaseSlot(slotRequestId, (Throwable) null);
                MatcherAssert.assertThat(completableFuture.join(), CoreMatchers.is(allocationID));
                if (createDeclarativeSlotPoolBridge != null) {
                    if (0 == 0) {
                        createDeclarativeSlotPoolBridge.close();
                        return;
                    }
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (th2 != null) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th6) {
                        th2.addSuppressed(th6);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th5;
        }
    }

    @Test
    public void testNoConcurrentModificationWhenSuspendingAndReleasingSlot() throws Exception {
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new DefaultDeclarativeSlotPoolFactory(), this.requestSlotMatchingStrategy);
        Throwable th = null;
        try {
            createDeclarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
            List list = (List) Arrays.asList(new SlotRequestId(), new SlotRequestId()).stream().map(slotRequestId -> {
                CompletableFuture requestNewAllocatedSlot = createDeclarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, rpcTimeout);
                requestNewAllocatedSlot.whenComplete((physicalSlot, th2) -> {
                    if (th2 != null) {
                        createDeclarativeSlotPoolBridge.releaseSlot(slotRequestId, th2);
                    }
                });
                return requestNewAllocatedSlot;
            }).collect(Collectors.toList());
            createDeclarativeSlotPoolBridge.close();
            try {
                FutureUtils.waitForAll(list).get();
                Assert.fail("The slot futures should be completed exceptionally.");
            } catch (ExecutionException e) {
            }
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 == 0) {
                    createDeclarativeSlotPoolBridge.close();
                    return;
                }
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 != 0) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAcceptingOfferedSlotsWithoutResourceManagerConnected() throws Exception {
        DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge = createDeclarativeSlotPoolBridge(new DefaultDeclarativeSlotPoolFactory(), this.requestSlotMatchingStrategy);
        Throwable th = null;
        try {
            createDeclarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
            CompletableFuture requestNewAllocatedSlot = createDeclarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, rpcTimeout);
            LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
            createDeclarativeSlotPoolBridge.registerTaskManager(localTaskManagerLocation.getResourceID());
            AllocationID allocationID = new AllocationID();
            createDeclarativeSlotPoolBridge.offerSlots(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), Collections.singleton(new SlotOffer(allocationID, 0, ResourceProfile.ANY)));
            MatcherAssert.assertThat(((PhysicalSlot) requestNewAllocatedSlot.join()).getAllocationId(), CoreMatchers.is(allocationID));
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 == 0) {
                    createDeclarativeSlotPoolBridge.close();
                    return;
                }
                try {
                    createDeclarativeSlotPoolBridge.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createDeclarativeSlotPoolBridge != null) {
                if (0 != 0) {
                    try {
                        createDeclarativeSlotPoolBridge.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDeclarativeSlotPoolBridge.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public static DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(DeclarativeSlotPoolFactory declarativeSlotPoolFactory, RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
        return new DeclarativeSlotPoolBridge(jobId, declarativeSlotPoolFactory, SystemClock.getInstance(), rpcTimeout, Time.seconds(20L), Time.seconds(20L), requestSlotMatchingStrategy);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static PhysicalSlot createAllocatedSlot(AllocationID allocationID) {
        return new AllocatedSlot(allocationID, new LocalTaskManagerLocation(), 0, ResourceProfile.ANY, new RpcTaskManagerGateway(new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(), JobMasterId.generate()));
    }
}
