package org.apache.flink.table.runtime.operators.join.window;

import java.time.ZoneId;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl;
import org.apache.flink.table.runtime.operators.window.state.WindowListState;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.types.RowKind;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.class */
public abstract class WindowJoinOperator extends TableStreamOperator<RowData> implements TwoInputStreamOperator<RowData, RowData, RowData>, Triggerable<RowData, Long>, KeyContext {
    private static final long serialVersionUID = 1;
    private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME = "leftNumLateRecordsDropped";
    private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "leftLateRecordsDroppedRate";
    private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME = "rightNumLateRecordsDropped";
    private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "rightLateRecordsDroppedRate";
    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    private static final String LEFT_RECORDS_STATE_NAME = "left-records";
    private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
    protected final RowDataSerializer leftSerializer;
    protected final RowDataSerializer rightSerializer;
    private final GeneratedJoinCondition generatedJoinCondition;
    private final int leftWindowEndIndex;
    private final int rightWindowEndIndex;
    private final boolean[] filterNullKeys;
    private final ZoneId shiftTimeZone;
    private transient WindowTimerService<Long> windowTimerService;
    protected transient JoinConditionWithNullFilters joinCondition;
    protected transient TimestampedCollector<RowData> collector;
    private transient WindowListState<Long> leftWindowState;
    private transient WindowListState<Long> rightWindowState;
    private transient Counter leftNumLateRecordsDropped;
    private transient Meter leftLateRecordsDroppedRate;
    private transient Counter rightNumLateRecordsDropped;
    private transient Meter rightLateRecordsDroppedRate;
    private transient Gauge<Long> watermarkLatency;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator$AbstractOuterJoinOperator.class */
    private static abstract class AbstractOuterJoinOperator extends WindowJoinOperator {
        private static final long serialVersionUID = 1;
        private transient RowData leftNullRow;
        private transient RowData rightNullRow;
        private transient JoinedRowData outRow;

        AbstractOuterJoinOperator(TypeSerializer<RowData> typeSerializer, TypeSerializer<RowData> typeSerializer2, GeneratedJoinCondition generatedJoinCondition, int i, int i2, boolean[] zArr, ZoneId zoneId) {
            super(typeSerializer, typeSerializer2, generatedJoinCondition, i, i2, zArr, zoneId);
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator, org.apache.flink.table.runtime.operators.TableStreamOperator
        public void open() throws Exception {
            super.open();
            this.leftNullRow = new GenericRowData(this.leftSerializer.getArity());
            this.rightNullRow = new GenericRowData(this.rightSerializer.getArity());
            this.outRow = new JoinedRowData();
        }

        protected void outputNullPadding(RowData rowData, boolean z) {
            if (z) {
                this.outRow.replace(rowData, this.rightNullRow);
            } else {
                this.outRow.replace(this.leftNullRow, rowData);
            }
            this.outRow.setRowKind(RowKind.INSERT);
            this.collector.collect(this.outRow);
        }

        protected void outputNullPadding(Iterable<RowData> iterable, boolean z) {
            Iterator<RowData> it = iterable.iterator();
            while (it.hasNext()) {
                outputNullPadding(it.next(), z);
            }
        }

        protected void output(RowData rowData, RowData rowData2, boolean z) {
            if (z) {
                this.outRow.replace(rowData, rowData2);
            } else {
                this.outRow.replace(rowData2, rowData);
            }
            this.outRow.setRowKind(RowKind.INSERT);
            this.collector.collect(this.outRow);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator$FullOuterJoinOperator.class */
    static class FullOuterJoinOperator extends AbstractOuterJoinOperator {
        private static final long serialVersionUID = 1;

        /* JADX INFO: Access modifiers changed from: package-private */
        public FullOuterJoinOperator(TypeSerializer<RowData> typeSerializer, TypeSerializer<RowData> typeSerializer2, GeneratedJoinCondition generatedJoinCondition, int i, int i2, boolean[] zArr, ZoneId zoneId) {
            super(typeSerializer, typeSerializer2, generatedJoinCondition, i, i2, zArr, zoneId);
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator
        public void join(Iterable<RowData> iterable, Iterable<RowData> iterable2) {
            if (iterable == null && iterable2 == null) {
                return;
            }
            if (iterable2 == null) {
                outputNullPadding(iterable, true);
                return;
            }
            if (iterable == null) {
                outputNullPadding(iterable2, false);
                return;
            }
            IdentityHashMap identityHashMap = new IdentityHashMap();
            for (RowData rowData : iterable) {
                boolean z = false;
                for (RowData rowData2 : iterable2) {
                    if (this.joinCondition.apply(rowData, rowData2)) {
                        output(rowData, rowData2, true);
                        z = true;
                        identityHashMap.put(rowData2, Boolean.TRUE);
                    }
                }
                if (!z) {
                    outputNullPadding(rowData, true);
                }
            }
            for (RowData rowData3 : iterable2) {
                if (!identityHashMap.containsKey(rowData3)) {
                    outputNullPadding(rowData3, false);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator$InnerJoinOperator.class */
    static class InnerJoinOperator extends WindowJoinOperator {
        private transient JoinedRowData outRow;

        /* JADX INFO: Access modifiers changed from: package-private */
        public InnerJoinOperator(TypeSerializer<RowData> typeSerializer, TypeSerializer<RowData> typeSerializer2, GeneratedJoinCondition generatedJoinCondition, int i, int i2, boolean[] zArr, ZoneId zoneId) {
            super(typeSerializer, typeSerializer2, generatedJoinCondition, i, i2, zArr, zoneId);
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator, org.apache.flink.table.runtime.operators.TableStreamOperator
        public void open() throws Exception {
            super.open();
            this.outRow = new JoinedRowData();
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator
        public void join(Iterable<RowData> iterable, Iterable<RowData> iterable2) {
            if (iterable == null || iterable2 == null) {
                return;
            }
            for (RowData rowData : iterable) {
                for (RowData rowData2 : iterable2) {
                    if (this.joinCondition.apply(rowData, rowData2)) {
                        this.outRow.setRowKind(RowKind.INSERT);
                        this.outRow.replace(rowData, rowData2);
                        this.collector.collect(this.outRow);
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator$LeftOuterJoinOperator.class */
    static class LeftOuterJoinOperator extends AbstractOuterJoinOperator {
        private static final long serialVersionUID = 1;

        /* JADX INFO: Access modifiers changed from: package-private */
        public LeftOuterJoinOperator(TypeSerializer<RowData> typeSerializer, TypeSerializer<RowData> typeSerializer2, GeneratedJoinCondition generatedJoinCondition, int i, int i2, boolean[] zArr, ZoneId zoneId) {
            super(typeSerializer, typeSerializer2, generatedJoinCondition, i, i2, zArr, zoneId);
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator
        public void join(Iterable<RowData> iterable, Iterable<RowData> iterable2) {
            if (iterable == null) {
                return;
            }
            if (iterable2 == null) {
                outputNullPadding(iterable, true);
                return;
            }
            for (RowData rowData : iterable) {
                boolean z = false;
                for (RowData rowData2 : iterable2) {
                    if (this.joinCondition.apply(rowData, rowData2)) {
                        output(rowData, rowData2, true);
                        z = true;
                    }
                }
                if (!z) {
                    outputNullPadding(rowData, true);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator$RightOuterJoinOperator.class */
    static class RightOuterJoinOperator extends AbstractOuterJoinOperator {
        private static final long serialVersionUID = 1;

        /* JADX INFO: Access modifiers changed from: package-private */
        public RightOuterJoinOperator(TypeSerializer<RowData> typeSerializer, TypeSerializer<RowData> typeSerializer2, GeneratedJoinCondition generatedJoinCondition, int i, int i2, boolean[] zArr, ZoneId zoneId) {
            super(typeSerializer, typeSerializer2, generatedJoinCondition, i, i2, zArr, zoneId);
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator
        public void join(Iterable<RowData> iterable, Iterable<RowData> iterable2) {
            if (iterable2 == null) {
                return;
            }
            if (iterable == null) {
                outputNullPadding(iterable2, false);
                return;
            }
            for (RowData rowData : iterable2) {
                boolean z = false;
                for (RowData rowData2 : iterable) {
                    if (this.joinCondition.apply(rowData2, rowData)) {
                        output(rowData2, rowData, true);
                        z = true;
                    }
                }
                if (!z) {
                    outputNullPadding(rowData, false);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator$SemiAntiJoinOperator.class */
    static class SemiAntiJoinOperator extends WindowJoinOperator {
        private final boolean isAntiJoin;

        /* JADX INFO: Access modifiers changed from: package-private */
        public SemiAntiJoinOperator(TypeSerializer<RowData> typeSerializer, TypeSerializer<RowData> typeSerializer2, GeneratedJoinCondition generatedJoinCondition, int i, int i2, boolean[] zArr, boolean z, ZoneId zoneId) {
            super(typeSerializer, typeSerializer2, generatedJoinCondition, i, i2, zArr, zoneId);
            this.isAntiJoin = z;
        }

        @Override // org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator
        public void join(Iterable<RowData> iterable, Iterable<RowData> iterable2) {
            if (iterable == null) {
                return;
            }
            if (iterable2 == null) {
                if (this.isAntiJoin) {
                    Iterator<RowData> it = iterable.iterator();
                    while (it.hasNext()) {
                        this.collector.collect(it.next());
                    }
                    return;
                }
                return;
            }
            for (RowData rowData : iterable) {
                boolean z = false;
                Iterator<RowData> it2 = iterable2.iterator();
                while (true) {
                    if (!it2.hasNext()) {
                        break;
                    }
                    if (this.joinCondition.apply(rowData, it2.next())) {
                        z = true;
                        break;
                    }
                }
                if (z) {
                    if (!this.isAntiJoin) {
                        this.collector.collect(rowData);
                    }
                } else if (this.isAntiJoin) {
                    this.collector.collect(rowData);
                }
            }
        }
    }

    WindowJoinOperator(TypeSerializer<RowData> typeSerializer, TypeSerializer<RowData> typeSerializer2, GeneratedJoinCondition generatedJoinCondition, int i, int i2, boolean[] zArr, ZoneId zoneId) {
        this.leftSerializer = (RowDataSerializer) typeSerializer;
        this.rightSerializer = (RowDataSerializer) typeSerializer2;
        this.generatedJoinCondition = generatedJoinCondition;
        this.leftWindowEndIndex = i;
        this.rightWindowEndIndex = i2;
        this.filterNullKeys = zArr;
        this.shiftTimeZone = zoneId;
    }

    @Override // org.apache.flink.table.runtime.operators.TableStreamOperator
    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector<>(this.output);
        this.collector.eraseTimestamp();
        LongSerializer longSerializer = LongSerializer.INSTANCE;
        this.windowTimerService = new WindowTimerServiceImpl(getInternalTimerService("window-timers", longSerializer, this), this.shiftTimeZone);
        this.joinCondition = new JoinConditionWithNullFilters(this.generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()), this.filterNullKeys, this);
        this.joinCondition.setRuntimeContext(getRuntimeContext());
        this.joinCondition.open(new Configuration());
        this.leftWindowState = new WindowListState<>((ListState) getOrCreateKeyedState(longSerializer, new ListStateDescriptor(LEFT_RECORDS_STATE_NAME, this.leftSerializer)));
        this.rightWindowState = new WindowListState<>((ListState) getOrCreateKeyedState(longSerializer, new ListStateDescriptor(RIGHT_RECORDS_STATE_NAME, this.rightSerializer)));
        this.leftNumLateRecordsDropped = this.metrics.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.leftLateRecordsDroppedRate = this.metrics.meter(LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, new MeterView(this.leftNumLateRecordsDropped));
        this.rightNumLateRecordsDropped = this.metrics.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.rightLateRecordsDroppedRate = this.metrics.meter(RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, new MeterView(this.rightNumLateRecordsDropped));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long currentWatermark = this.windowTimerService.currentWatermark();
            if (currentWatermark < 0) {
                return 0L;
            }
            return Long.valueOf(this.windowTimerService.currentProcessingTime() - currentWatermark);
        });
    }

    public void close() throws Exception {
        super.close();
        this.collector = null;
        if (this.joinCondition != null) {
            this.joinCondition.close();
        }
    }

    public void processElement1(StreamRecord<RowData> streamRecord) throws Exception {
        processElement(streamRecord, this.leftWindowEndIndex, this.leftLateRecordsDroppedRate, this.leftWindowState);
    }

    public void processElement2(StreamRecord<RowData> streamRecord) throws Exception {
        processElement(streamRecord, this.rightWindowEndIndex, this.rightLateRecordsDroppedRate, this.rightWindowState);
    }

    private void processElement(StreamRecord<RowData> streamRecord, int i, Meter meter, WindowListState<Long> windowListState) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        long j = rowData.getLong(i);
        if (TimeWindowUtil.isWindowFired(j, this.windowTimerService.currentWatermark(), this.shiftTimeZone)) {
            meter.markEvent();
        } else {
            if (!RowDataUtil.isAccumulateMsg(rowData)) {
                throw new UnsupportedOperationException("This is a bug and should not happen. Please file an issue.");
            }
            windowListState.add(Long.valueOf(j), rowData);
            this.windowTimerService.registerEventTimeWindowTimer(Long.valueOf(j));
        }
    }

    public void onProcessingTime(InternalTimer<RowData, Long> internalTimer) throws Exception {
        throw new UnsupportedOperationException("This is a bug and should not happen. Please file an issue.");
    }

    public void onEventTime(InternalTimer<RowData, Long> internalTimer) throws Exception {
        setCurrentKey(internalTimer.getKey());
        Long l = (Long) internalTimer.getNamespace();
        List<RowData> list = this.leftWindowState.get(l);
        List<RowData> list2 = this.rightWindowState.get(l);
        join(list, list2);
        if (list != null) {
            this.leftWindowState.clear(l);
        }
        if (list2 != null) {
            this.rightWindowState.clear(l);
        }
    }

    public abstract void join(Iterable<RowData> iterable, Iterable<RowData> iterable2);
}
