package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.class */
public class DefaultVertexParallelismDecider implements VertexParallelismDecider {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultVertexParallelismDecider.class);
    private static final double CAP_RATIO_OF_BROADCAST = 0.5d;
    private final int maxParallelism;
    private final int minParallelism;
    private final long dataVolumePerTask;
    private final int defaultSourceParallelism;

    private DefaultVertexParallelismDecider(int i, int i2, MemorySize memorySize, int i3) {
        Preconditions.checkArgument(i2 > 0, "The minimum parallelism must be larger than 0.");
        Preconditions.checkArgument(i >= i2, "Maximum parallelism should be greater than or equal to the minimum parallelism.");
        Preconditions.checkArgument(i3 > 0, "The default source parallelism must be larger than 0.");
        Preconditions.checkNotNull(memorySize);
        this.maxParallelism = i;
        this.minParallelism = i2;
        this.dataVolumePerTask = memorySize.getBytes();
        this.defaultSourceParallelism = i3;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider
    public int decideParallelismForVertex(List<BlockingResultInfo> list) {
        return list.isEmpty() ? this.defaultSourceParallelism : calculateParallelism(list);
    }

    private int calculateParallelism(List<BlockingResultInfo> list) {
        long sum = list.stream().filter((v0) -> {
            return v0.isBroadcast();
        }).mapToLong(blockingResultInfo -> {
            return blockingResultInfo.getBlockingPartitionSizes().stream().reduce(0L, (v0, v1) -> {
                return Long.sum(v0, v1);
            }).longValue();
        }).sum();
        long sum2 = list.stream().filter(blockingResultInfo2 -> {
            return !blockingResultInfo2.isBroadcast();
        }).mapToLong(blockingResultInfo3 -> {
            return blockingResultInfo3.getBlockingPartitionSizes().stream().reduce(0L, (v0, v1) -> {
                return Long.sum(v0, v1);
            }).longValue();
        }).sum();
        long ceil = (long) Math.ceil(this.dataVolumePerTask * CAP_RATIO_OF_BROADCAST);
        if (sum > ceil) {
            LOG.info("The size of broadcast data {} is larger than the expected maximum value {} ('{}' * {}). Use {} as the size of broadcast data to decide the parallelism.", new Object[]{new MemorySize(sum), new MemorySize(ceil), JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(), Double.valueOf(CAP_RATIO_OF_BROADCAST), new MemorySize(ceil)});
            sum = ceil;
        }
        int ceil2 = (int) Math.ceil(sum2 / (this.dataVolumePerTask - sum));
        LOG.debug("The size of broadcast data is {}, the size of non-broadcast data is {}, the initially decided parallelism is {}.", new Object[]{new MemorySize(sum), new MemorySize(sum2), Integer.valueOf(ceil2)});
        if (ceil2 < this.minParallelism) {
            LOG.info("The initially decided parallelism {} is smaller than the minimum parallelism {} (which is configured by '{}'). Use {} as the finally decided parallelism.", new Object[]{Integer.valueOf(ceil2), Integer.valueOf(this.minParallelism), JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM.key(), Integer.valueOf(this.minParallelism)});
            ceil2 = this.minParallelism;
        } else if (ceil2 > this.maxParallelism) {
            LOG.info("The initially decided parallelism {} is larger than the maximum parallelism {} (which is configured by '{}'). Use {} as the finally decided parallelism.", new Object[]{Integer.valueOf(ceil2), Integer.valueOf(this.maxParallelism), JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.key(), Integer.valueOf(this.maxParallelism)});
            ceil2 = this.maxParallelism;
        }
        return ceil2;
    }

    public static DefaultVertexParallelismDecider from(Configuration configuration) {
        return new DefaultVertexParallelismDecider(configuration.getInteger(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM), configuration.getInteger(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM), (MemorySize) configuration.get(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK), ((Integer) configuration.get(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM)).intValue());
    }
}
