package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Test;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.stats.Keys;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

/* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/StageTest.class */
public class StageTest {

    /* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/StageTest$LastReceiveOrderAssertingStep.class */
    private static class LastReceiveOrderAssertingStep extends ReceiveOrderAssertingStep {
        LastReceiveOrderAssertingStep(StageControl stageControl, String str, int i, int i2, long j) {
            super(stageControl, str, i, i2, j);
        }

        @Override // org.neo4j.unsafe.impl.batchimport.staging.StageTest.ReceiveOrderAssertingStep
        protected Object process(long j, Object obj) {
            super.process(j, obj);
            return null;
        }
    }

    /* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/StageTest$ReceiveOrderAssertingStep.class */
    private static class ReceiveOrderAssertingStep extends ExecutorServiceStep<Object> {
        private final AtomicLong lastTicket;
        private final long processingTime;

        ReceiveOrderAssertingStep(StageControl stageControl, String str, int i, int i2, long j) {
            super(stageControl, str, i, 100, i2, new StatsProvider[0]);
            this.lastTicket = new AtomicLong();
            this.processingTime = j;
        }

        public long receive(long j, Object obj) {
            Assert.assertEquals(this.lastTicket.incrementAndGet(), j);
            return super.receive(j, obj);
        }

        protected Object process(long j, Object obj) {
            try {
                Thread.sleep(this.processingTime);
                return obj;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    public void shouldReceiveBatchesInOrder() throws Exception {
        Stage stage = new Stage("Test stage", new Configuration.Default(), true);
        final long j = 1000 * 10;
        stage.add(new ProducerStep<Object>(stage.control(), "Producer", 10, 100) { // from class: org.neo4j.unsafe.impl.batchimport.staging.StageTest.1
            private final Object theObject = new Object();
            private long i;

            protected Object nextBatchOrNull(int i) {
                if (this.i >= j) {
                    return null;
                }
                Object[] objArr = new Object[i];
                Arrays.fill(objArr, this.theObject);
                this.i += i;
                return objArr;
            }
        });
        for (int i = 0; i < 3; i++) {
            stage.add(new ReceiveOrderAssertingStep(stage.control(), "Step" + i, 20, 2, i));
        }
        stage.add(new LastReceiveOrderAssertingStep(stage.control(), "Final step", 20, 2, 0L));
        StageExecution execute = stage.execute();
        new ExecutionSupervisor(ExecutionMonitors.invisible()).supervise(new StageExecution[]{execute});
        Iterator it = execute.steps().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(1000L, ((Step) it.next()).stats().stat(Keys.done_batches).asLong());
        }
        stage.close();
    }
}
