package org.apache.flink.state.changelog;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.state.changelog.PeriodicMaterializationManager;
import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/state/changelog/ChangelogKeyedStateBackend.class */
public class ChangelogKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K>, CheckpointListener, TestableKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogKeyedStateBackend.class);
    private static final CheckpointOptions CHECKPOINT_OPTIONS = new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = (Map) Stream.of((Object[]) new Tuple2[]{Tuple2.of(StateDescriptor.Type.VALUE, ChangelogValueState::create), Tuple2.of(StateDescriptor.Type.LIST, ChangelogListState::create), Tuple2.of(StateDescriptor.Type.REDUCING, ChangelogReducingState::create), Tuple2.of(StateDescriptor.Type.AGGREGATING, ChangelogAggregatingState::create), Tuple2.of(StateDescriptor.Type.MAP, ChangelogMapState::create)}).collect(Collectors.toMap(tuple2 -> {
        return (StateDescriptor.Type) tuple2.f0;
    }, tuple22 -> {
        return (StateFactory) tuple22.f1;
    }));
    private final AbstractKeyedStateBackend<K> keyedStateBackend;
    private final ExecutionConfig executionConfig;
    private final TtlTimeProvider ttlTimeProvider;
    private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter;
    private final CheckpointStreamFactory streamFactory;
    private ChangelogSnapshotState changelogSnapshotState;
    private InternalKvState lastState;
    private String lastName;

    @Nullable
    private SequenceNumber lastUploadedFrom;

    @Nullable
    private SequenceNumber lastUploadedTo;
    private final String subtaskName;
    private final Closer closer = Closer.create();
    private long lastCheckpointId = -1;
    private long materializedId = 0;
    private final FunctionDelegationHelper functionDelegationHelper = new FunctionDelegationHelper();
    private short lastCreatedStateId = -1;
    private final Map<String, InternalKvState<K, ?, ?>> keyValueStatesByName = new HashMap();
    private final Map<String, ChangelogKeyGroupedPriorityQueue<?>> priorityQueueStatesByName = new HashMap();
    private final Map<String, ChangelogState> changelogStates = new HashMap();

    /* renamed from: org.apache.flink.state.changelog.ChangelogKeyedStateBackend$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/state/changelog/ChangelogKeyedStateBackend$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType = new int[StateMetaInfoSnapshot.BackendStateType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType[StateMetaInfoSnapshot.BackendStateType.KEY_VALUE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType[StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/changelog/ChangelogKeyedStateBackend$ChangelogSnapshotState.class */
    public static class ChangelogSnapshotState {
        private final List<KeyedStateHandle> materializedSnapshot;
        private final SequenceNumber materializedTo;
        private final List<ChangelogStateHandle> restoredNonMaterialized;

        public ChangelogSnapshotState(List<KeyedStateHandle> list, List<ChangelogStateHandle> list2, SequenceNumber sequenceNumber) {
            this.materializedSnapshot = Collections.unmodifiableList(list);
            this.restoredNonMaterialized = Collections.unmodifiableList(list2);
            this.materializedTo = sequenceNumber;
        }

        public List<KeyedStateHandle> getMaterializedSnapshot() {
            return this.materializedSnapshot;
        }

        public SequenceNumber lastMaterializedTo() {
            return this.materializedTo;
        }

        public List<ChangelogStateHandle> getRestoredNonMaterialized() {
            return this.restoredNonMaterialized;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/changelog/ChangelogKeyedStateBackend$StateFactory.class */
    public interface StateFactory {
        /* JADX WARN: Incorrect return type in method signature: <K:Ljava/lang/Object;N:Ljava/lang/Object;SV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/runtime/state/internal/InternalKvState<TK;TN;TSV;>;Lorg/apache/flink/state/changelog/KvStateChangeLogger<TSV;TN;>;Lorg/apache/flink/runtime/state/heap/InternalKeyContext<TK;>;)TIS; */
        State create(InternalKvState internalKvState, KvStateChangeLogger kvStateChangeLogger, InternalKeyContext internalKeyContext) throws Exception;
    }

    public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> abstractKeyedStateBackend, String str, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> collection, CheckpointStorageWorkerView checkpointStorageWorkerView) {
        this.keyedStateBackend = abstractKeyedStateBackend;
        this.subtaskName = str;
        this.executionConfig = executionConfig;
        this.ttlTimeProvider = ttlTimeProvider;
        this.stateChangelogWriter = stateChangelogWriter;
        this.changelogSnapshotState = completeRestore(collection);
        this.streamFactory = checkpointedStateScope -> {
            return checkpointStorageWorkerView.createTaskOwnedStateStream();
        };
        this.closer.register(abstractKeyedStateBackend);
    }

    public KeyGroupRange getKeyGroupRange() {
        return this.keyedStateBackend.getKeyGroupRange();
    }

    public void close() throws IOException {
        this.closer.close();
    }

    public void setCurrentKey(K k) {
        this.keyedStateBackend.setCurrentKey(k);
    }

    public K getCurrentKey() {
        return (K) this.keyedStateBackend.getCurrentKey();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keyedStateBackend.getKeySerializer();
    }

    public <N> Stream<K> getKeys(String str, N n) {
        return this.keyedStateBackend.getKeys(str, n);
    }

    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String str) {
        return this.keyedStateBackend.getKeysAndNamespaces(str);
    }

    public void dispose() {
        this.keyedStateBackend.dispose();
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName.clear();
        this.changelogStates.clear();
        this.priorityQueueStatesByName.clear();
    }

    public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> keySelectionListener) {
        this.keyedStateBackend.registerKeySelectionListener(keySelectionListener);
    }

    public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> keySelectionListener) {
        return this.keyedStateBackend.deregisterKeySelectionListener(keySelectionListener);
    }

    public <N, S extends State, T> void applyToAllKeys(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> keyedStateFunction) throws Exception {
        this.keyedStateBackend.applyToAllKeys(n, typeSerializer, stateDescriptor, keyedStateFunction, this::getPartitionedState);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <N, S extends State> S getPartitionedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(n, "Namespace");
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(n);
            return this.lastState;
        }
        InternalKvState<K, ?, ?> internalKvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (internalKvState != null) {
            this.lastState = internalKvState;
            this.lastState.setCurrentNamespace(n);
            this.lastName = stateDescriptor.getName();
            this.functionDelegationHelper.addOrUpdate(stateDescriptor);
            return internalKvState;
        }
        InternalKvState orCreateKeyedState = getOrCreateKeyedState(typeSerializer, stateDescriptor);
        InternalKvState internalKvState2 = orCreateKeyedState;
        this.lastName = stateDescriptor.getName();
        this.lastState = internalKvState2;
        internalKvState2.setCurrentNamespace(n);
        return orCreateKeyedState;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        this.lastCheckpointId = j;
        this.lastUploadedFrom = this.changelogSnapshotState.lastMaterializedTo();
        this.lastUploadedTo = getLastAppendedTo();
        LOG.info("snapshot of {} for checkpoint {}, change range: {}..{}", new Object[]{this.subtaskName, Long.valueOf(j), this.lastUploadedFrom, this.lastUploadedTo});
        ChangelogSnapshotState changelogSnapshotState = this.changelogSnapshotState;
        return toRunnableFuture(this.stateChangelogWriter.persist(this.lastUploadedFrom).thenApply(changelogStateHandle -> {
            return buildSnapshotResult(changelogStateHandle, changelogSnapshotState);
        }));
    }

    private SnapshotResult<KeyedStateHandle> buildSnapshotResult(ChangelogStateHandle changelogStateHandle, ChangelogSnapshotState changelogSnapshotState) {
        ArrayList arrayList = new ArrayList(changelogSnapshotState.getRestoredNonMaterialized());
        if (changelogStateHandle != null && changelogStateHandle.getStateSize() > 0) {
            arrayList.add(changelogStateHandle);
        }
        return (arrayList.isEmpty() && changelogSnapshotState.getMaterializedSnapshot().isEmpty()) ? SnapshotResult.empty() : SnapshotResult.of(new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(changelogSnapshotState.getMaterializedSnapshot(), arrayList, getKeyGroupRange()));
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
        ChangelogKeyGroupedPriorityQueue<?> changelogKeyGroupedPriorityQueue = this.priorityQueueStatesByName.get(str);
        if (changelogKeyGroupedPriorityQueue == null) {
            InternalKeyContext keyContext = this.keyedStateBackend.getKeyContext();
            StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter = this.stateChangelogWriter;
            RegisteredPriorityQueueStateBackendMetaInfo registeredPriorityQueueStateBackendMetaInfo = new RegisteredPriorityQueueStateBackendMetaInfo(str, typeSerializer);
            short s = (short) (this.lastCreatedStateId + 1);
            this.lastCreatedStateId = s;
            PriorityQueueStateChangeLoggerImpl priorityQueueStateChangeLoggerImpl = new PriorityQueueStateChangeLoggerImpl(typeSerializer, keyContext, stateChangelogWriter, registeredPriorityQueueStateBackendMetaInfo, s);
            this.closer.register(priorityQueueStateChangeLoggerImpl);
            changelogKeyGroupedPriorityQueue = new ChangelogKeyGroupedPriorityQueue<>(this.keyedStateBackend.create(str, typeSerializer), priorityQueueStateChangeLoggerImpl, typeSerializer);
            this.priorityQueueStatesByName.put(str, changelogKeyGroupedPriorityQueue);
        }
        return changelogKeyGroupedPriorityQueue;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries() {
        return this.keyedStateBackend.numKeyValueStateEntries();
    }

    public boolean isSafeToReuseKVState() {
        return this.keyedStateBackend.isSafeToReuseKVState();
    }

    @Nonnull
    public SavepointResources<K> savepoint() throws Exception {
        return this.keyedStateBackend.savepoint();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.lastCheckpointId == j) {
            this.stateChangelogWriter.confirm(this.lastUploadedFrom, this.lastUploadedTo);
        }
        this.keyedStateBackend.notifyCheckpointComplete(j);
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        if (this.lastCheckpointId == j) {
            this.stateChangelogWriter.reset(this.lastUploadedFrom, this.lastUploadedTo);
        }
        this.keyedStateBackend.notifyCheckpointAborted(j);
    }

    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(typeSerializer, "Namespace serializer");
        Preconditions.checkNotNull(getKeySerializer(), "State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        InternalKvState<K, ?, ?> internalKvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (internalKvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            }
            internalKvState = LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled(TtlStateFactory.createStateAndWrapWithTtlIfEnabled(typeSerializer, stateDescriptor, this, this.ttlTimeProvider), stateDescriptor, this.keyedStateBackend.getLatencyTrackingStateConfig());
            this.keyValueStatesByName.put(stateDescriptor.getName(), internalKvState);
            this.keyedStateBackend.publishQueryableStateIfEnabled(stateDescriptor, internalKvState);
        }
        this.functionDelegationHelper.addOrUpdate(stateDescriptor);
        return internalKvState;
    }

    /* JADX WARN: Incorrect return type in method signature: <N:Ljava/lang/Object;SV:Ljava/lang/Object;SEV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/typeutils/TypeSerializer<TN;>;Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;Lorg/apache/flink/runtime/state/StateSnapshotTransformer$StateSnapshotTransformFactory<TSEV;>;)TIS; */
    @Nonnull
    public State createInternalState(@Nonnull TypeSerializer typeSerializer, @Nonnull StateDescriptor stateDescriptor, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory stateSnapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getType());
        if (stateFactory == null) {
            throw new FlinkRuntimeException(String.format("State %s is not supported by %s", stateDescriptor.getClass(), getClass()));
        }
        RegisteredKeyValueStateBackendMetaInfo registeredKeyValueStateBackendMetaInfo = new RegisteredKeyValueStateBackendMetaInfo(stateDescriptor.getType(), stateDescriptor.getName(), typeSerializer, stateDescriptor.getSerializer(), stateSnapshotTransformFactory);
        InternalKvState internalKvState = (InternalKvState) this.keyedStateBackend.createInternalState(typeSerializer, stateDescriptor, stateSnapshotTransformFactory);
        TypeSerializer keySerializer = internalKvState.getKeySerializer();
        TypeSerializer namespaceSerializer = internalKvState.getNamespaceSerializer();
        TypeSerializer valueSerializer = internalKvState.getValueSerializer();
        InternalKeyContext keyContext = this.keyedStateBackend.getKeyContext();
        StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter = this.stateChangelogWriter;
        StateTtlConfig ttlConfig = stateDescriptor.getTtlConfig();
        Object defaultValue = stateDescriptor.getDefaultValue();
        short s = (short) (this.lastCreatedStateId + 1);
        this.lastCreatedStateId = s;
        KvStateChangeLoggerImpl kvStateChangeLoggerImpl = new KvStateChangeLoggerImpl(keySerializer, namespaceSerializer, valueSerializer, keyContext, stateChangelogWriter, registeredKeyValueStateBackendMetaInfo, ttlConfig, defaultValue, s);
        this.closer.register(kvStateChangeLoggerImpl);
        ChangelogState create = stateFactory.create(internalKvState, kvStateChangeLoggerImpl, this.keyedStateBackend);
        this.changelogStates.put(stateDescriptor.getName(), create);
        return create;
    }

    public void registerCloseable(@Nullable Closeable closeable) {
        this.closer.register(closeable);
    }

    private ChangelogSnapshotState completeRestore(Collection<ChangelogStateBackendHandle> collection) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (ChangelogStateBackendHandle changelogStateBackendHandle : collection) {
            if (changelogStateBackendHandle != null) {
                arrayList.addAll(changelogStateBackendHandle.getMaterializedStateHandles());
                arrayList2.addAll(changelogStateBackendHandle.getNonMaterializedStateHandles());
            }
        }
        return new ChangelogSnapshotState(arrayList, arrayList2, this.stateChangelogWriter.initialSequenceNumber());
    }

    public Optional<PeriodicMaterializationManager.MaterializationRunnable> initMaterialization() throws Exception {
        SequenceNumber lastAppendedTo = getLastAppendedTo();
        SequenceNumber lastMaterializedTo = this.changelogSnapshotState.lastMaterializedTo();
        LOG.info("Initialize Materialization. Current changelog writers last append to sequence number {}", lastAppendedTo);
        if (lastAppendedTo.compareTo(lastMaterializedTo) <= 0) {
            LOG.debug("Skip materialization, last materialized to {} : last log to {}", lastMaterializedTo, lastAppendedTo);
            return Optional.empty();
        }
        LOG.info("Starting materialization from {} : {}", lastMaterializedTo, lastAppendedTo);
        AbstractKeyedStateBackend<K> abstractKeyedStateBackend = this.keyedStateBackend;
        long j = this.materializedId;
        this.materializedId = j + 1;
        PeriodicMaterializationManager.MaterializationRunnable materializationRunnable = new PeriodicMaterializationManager.MaterializationRunnable(abstractKeyedStateBackend.snapshot(j, System.currentTimeMillis(), this.streamFactory, CHECKPOINT_OPTIONS), lastAppendedTo);
        Iterator<ChangelogState> it = this.changelogStates.values().iterator();
        while (it.hasNext()) {
            it.next().resetWritingMetaFlag();
        }
        Iterator<ChangelogKeyGroupedPriorityQueue<?>> it2 = this.priorityQueueStatesByName.values().iterator();
        while (it2.hasNext()) {
            it2.next().resetWritingMetaFlag();
        }
        return Optional.of(materializationRunnable);
    }

    private SequenceNumber getLastAppendedTo() {
        this.stateChangelogWriter.lastAppendedSequenceNumber();
        return this.stateChangelogWriter.lastAppendedSequenceNumber();
    }

    public void updateChangelogSnapshotState(SnapshotResult<KeyedStateHandle> snapshotResult, SequenceNumber sequenceNumber) {
        LOG.info("Task {} finishes materialization, updates the snapshotState upTo {} : {}", new Object[]{this.subtaskName, sequenceNumber, snapshotResult});
        this.changelogSnapshotState = new ChangelogSnapshotState(getMaterializedResult(snapshotResult), Collections.emptyList(), sequenceNumber);
        this.stateChangelogWriter.truncate(sequenceNumber);
    }

    private List<KeyedStateHandle> getMaterializedResult(@Nonnull SnapshotResult<KeyedStateHandle> snapshotResult) {
        KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
        return jobManagerOwnedSnapshot == null ? Collections.emptyList() : Collections.singletonList(jobManagerOwnedSnapshot);
    }

    public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean z) {
        return this.keyedStateBackend.getDelegatedKeyedStateBackend(z);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v13, types: [org.apache.flink.state.changelog.ChangelogState] */
    public ChangelogState getExistingStateForRecovery(String str, StateMetaInfoSnapshot.BackendStateType backendStateType) throws NoSuchElementException, UnsupportedOperationException {
        ChangelogKeyGroupedPriorityQueue<?> changelogKeyGroupedPriorityQueue;
        switch (AnonymousClass2.$SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType[backendStateType.ordinal()]) {
            case 1:
                changelogKeyGroupedPriorityQueue = this.changelogStates.get(str);
                break;
            case 2:
                changelogKeyGroupedPriorityQueue = this.priorityQueueStatesByName.get(str);
                break;
            default:
                throw new UnsupportedOperationException(String.format("Unknown state type %s (%s)", backendStateType, str));
        }
        if (changelogKeyGroupedPriorityQueue == null) {
            throw new NoSuchElementException(String.format("%s state %s not found", backendStateType, str));
        }
        return changelogKeyGroupedPriorityQueue;
    }

    private static <T> RunnableFuture<T> toRunnableFuture(final CompletableFuture<T> completableFuture) {
        return new RunnableFuture<T>() { // from class: org.apache.flink.state.changelog.ChangelogKeyedStateBackend.1
            @Override // java.util.concurrent.RunnableFuture, java.lang.Runnable
            public void run() {
                completableFuture.join();
            }

            @Override // java.util.concurrent.Future
            public boolean cancel(boolean z) {
                return completableFuture.cancel(z);
            }

            @Override // java.util.concurrent.Future
            public boolean isCancelled() {
                return completableFuture.isCancelled();
            }

            @Override // java.util.concurrent.Future
            public boolean isDone() {
                return completableFuture.isDone();
            }

            @Override // java.util.concurrent.Future
            public T get() throws InterruptedException, ExecutionException {
                return (T) completableFuture.get();
            }

            @Override // java.util.concurrent.Future
            public T get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                return (T) completableFuture.get(j, timeUnit);
            }
        };
    }

    @VisibleForTesting
    StateChangelogWriter<ChangelogStateHandle> getChangelogWriter() {
        return this.stateChangelogWriter;
    }
}
