package org.apache.flink.runtime.operators.lifecycle.graph;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestCommandAckEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.class */
class TestEventSource extends RichSourceFunction<TestDataElement> implements ParallelSourceFunction<TestDataElement> {
    private final String operatorID;
    private final TestCommandDispatcher commandQueue;
    private transient Queue<TestCommand> scheduledCommands;
    private volatile transient boolean isRunning = true;
    private final TestEventQueue eventQueue;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TestEventSource(String str, TestEventQueue testEventQueue, TestCommandDispatcher testCommandDispatcher) {
        this.operatorID = str;
        this.eventQueue = testEventQueue;
        this.commandQueue = testCommandDispatcher;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.isRunning = true;
        this.scheduledCommands = new LinkedBlockingQueue();
        this.commandQueue.subscribe(testCommand -> {
            this.scheduledCommands.add(testCommand);
        }, this.operatorID);
        this.eventQueue.add(new OperatorStartedEvent(this.operatorID, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void run(SourceFunction.SourceContext<TestDataElement> sourceContext) {
        long j = 0;
        while (this.isRunning) {
            TestCommand poll = j == 0 ? null : this.scheduledCommands.poll();
            if (poll == TestCommand.FINISH_SOURCES) {
                ack(poll);
                this.isRunning = false;
            } else {
                if (poll == TestCommand.FAIL) {
                    ack(poll);
                    throw new RuntimeException("requested to fail");
                }
                if (poll != null) {
                    throw new RuntimeException("unknown command " + poll);
                }
                synchronized (sourceContext.getCheckpointLock()) {
                    long j2 = j + 1;
                    j = sourceContext;
                    sourceContext.collect(new TestDataElement(this.operatorID, getRuntimeContext().getIndexOfThisSubtask(), j2));
                }
            }
        }
    }

    private void ack(TestCommand testCommand) {
        this.eventQueue.add(new TestCommandAckEvent(this.operatorID, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber(), testCommand));
    }

    public void cancel() {
        this.isRunning = false;
    }
}
