package org.apache.flink.runtime.io.network.api.serialization;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.SubtaskConnectionDescriptor;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.InstantiationUtil;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/serialization/EventSerializer.class */
public class EventSerializer {
    private static final int END_OF_PARTITION_EVENT = 0;
    private static final int CHECKPOINT_BARRIER_EVENT = 1;
    private static final int END_OF_SUPERSTEP_EVENT = 2;
    private static final int OTHER_EVENT = 3;
    private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
    private static final int END_OF_CHANNEL_STATE_EVENT = 5;
    private static final int ANNOUNCEMENT_EVENT = 6;
    private static final int VIRTUAL_CHANNEL_SELECTOR_EVENT = 7;
    private static final int END_OF_USER_RECORDS_EVENT = 8;
    private static final int CHECKPOINT_TYPE_CHECKPOINT = 0;
    private static final int CHECKPOINT_TYPE_SAVEPOINT = 1;
    private static final int CHECKPOINT_TYPE_SAVEPOINT_SUSPEND = 2;
    private static final int CHECKPOINT_TYPE_SAVEPOINT_TERMINATE = 3;
    private static final int CHECKPOINT_TYPE_FULL_CHECKPOINT = 4;

    public static ByteBuffer toSerializedEvent(AbstractEvent abstractEvent) throws IOException {
        Class<?> cls = abstractEvent.getClass();
        if (cls == EndOfPartitionEvent.class) {
            return ByteBuffer.wrap(new byte[]{0, 0, 0, 0});
        }
        if (cls == CheckpointBarrier.class) {
            return serializeCheckpointBarrier((CheckpointBarrier) abstractEvent);
        }
        if (cls == EndOfSuperstepEvent.class) {
            return ByteBuffer.wrap(new byte[]{0, 0, 0, 2});
        }
        if (cls == EndOfChannelStateEvent.class) {
            return ByteBuffer.wrap(new byte[]{0, 0, 0, 5});
        }
        if (cls == EndOfData.class) {
            return ByteBuffer.wrap(new byte[]{0, 0, 0, 8, (byte) ((EndOfData) abstractEvent).getStopMode().ordinal()});
        }
        if (cls == CancelCheckpointMarker.class) {
            ByteBuffer allocate = ByteBuffer.allocate(12);
            allocate.putInt(0, 4);
            allocate.putLong(4, ((CancelCheckpointMarker) abstractEvent).getCheckpointId());
            return allocate;
        }
        if (cls == EventAnnouncement.class) {
            EventAnnouncement eventAnnouncement = (EventAnnouncement) abstractEvent;
            ByteBuffer serializedEvent = toSerializedEvent(eventAnnouncement.getAnnouncedEvent());
            ByteBuffer allocate2 = ByteBuffer.allocate(8 + serializedEvent.capacity());
            allocate2.putInt(0, 6);
            allocate2.putInt(4, eventAnnouncement.getSequenceNumber());
            allocate2.position(8);
            allocate2.put(serializedEvent);
            allocate2.flip();
            return allocate2;
        }
        if (cls == SubtaskConnectionDescriptor.class) {
            SubtaskConnectionDescriptor subtaskConnectionDescriptor = (SubtaskConnectionDescriptor) abstractEvent;
            ByteBuffer allocate3 = ByteBuffer.allocate(12);
            allocate3.putInt(VIRTUAL_CHANNEL_SELECTOR_EVENT);
            allocate3.putInt(subtaskConnectionDescriptor.getInputSubtaskIndex());
            allocate3.putInt(subtaskConnectionDescriptor.getOutputSubtaskIndex());
            allocate3.flip();
            return allocate3;
        }
        try {
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
            dataOutputSerializer.writeInt(3);
            dataOutputSerializer.writeUTF(abstractEvent.getClass().getName());
            abstractEvent.write(dataOutputSerializer);
            return dataOutputSerializer.wrapAsByteBuffer();
        } catch (IOException e) {
            throw new IOException("Error while serializing event.", e);
        }
    }

    public static AbstractEvent fromSerializedEvent(ByteBuffer byteBuffer, ClassLoader classLoader) throws IOException {
        if (byteBuffer.remaining() < 4) {
            throw new IOException("Incomplete event");
        }
        ByteOrder order = byteBuffer.order();
        byteBuffer.order(ByteOrder.BIG_ENDIAN);
        try {
            int i = byteBuffer.getInt();
            if (i == 0) {
                EndOfPartitionEvent endOfPartitionEvent = EndOfPartitionEvent.INSTANCE;
                byteBuffer.order(order);
                return endOfPartitionEvent;
            }
            if (i == 1) {
                CheckpointBarrier deserializeCheckpointBarrier = deserializeCheckpointBarrier(byteBuffer);
                byteBuffer.order(order);
                return deserializeCheckpointBarrier;
            }
            if (i == 2) {
                EndOfSuperstepEvent endOfSuperstepEvent = EndOfSuperstepEvent.INSTANCE;
                byteBuffer.order(order);
                return endOfSuperstepEvent;
            }
            if (i == 5) {
                EndOfChannelStateEvent endOfChannelStateEvent = EndOfChannelStateEvent.INSTANCE;
                byteBuffer.order(order);
                return endOfChannelStateEvent;
            }
            if (i == 8) {
                EndOfData endOfData = new EndOfData(StopMode.values()[byteBuffer.get()]);
                byteBuffer.order(order);
                return endOfData;
            }
            if (i == 4) {
                CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(byteBuffer.getLong());
                byteBuffer.order(order);
                return cancelCheckpointMarker;
            }
            if (i == 6) {
                EventAnnouncement eventAnnouncement = new EventAnnouncement(fromSerializedEvent(byteBuffer, classLoader), byteBuffer.getInt());
                byteBuffer.order(order);
                return eventAnnouncement;
            }
            if (i == VIRTUAL_CHANNEL_SELECTOR_EVENT) {
                SubtaskConnectionDescriptor subtaskConnectionDescriptor = new SubtaskConnectionDescriptor(byteBuffer.getInt(), byteBuffer.getInt());
                byteBuffer.order(order);
                return subtaskConnectionDescriptor;
            }
            if (i != 3) {
                throw new IOException("Corrupt byte stream for event");
            }
            try {
                DataInputView dataInputDeserializer = new DataInputDeserializer(byteBuffer);
                String readUTF = dataInputDeserializer.readUTF();
                try {
                    AbstractEvent abstractEvent = (AbstractEvent) InstantiationUtil.instantiate(classLoader.loadClass(readUTF).asSubclass(AbstractEvent.class), AbstractEvent.class);
                    abstractEvent.read(dataInputDeserializer);
                    byteBuffer.order(order);
                    return abstractEvent;
                } catch (ClassCastException e) {
                    throw new IOException("The class '" + readUTF + "' is not a valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
                } catch (ClassNotFoundException e2) {
                    throw new IOException("Could not load event class '" + readUTF + "'.", e2);
                }
            } catch (Exception e3) {
                throw new IOException("Error while deserializing or instantiating event.", e3);
            }
        } catch (Throwable th) {
            byteBuffer.order(order);
            throw th;
        }
    }

    private static ByteBuffer serializeCheckpointBarrier(CheckpointBarrier checkpointBarrier) throws IOException {
        int i;
        CheckpointOptions checkpointOptions = checkpointBarrier.getCheckpointOptions();
        CheckpointType checkpointType = checkpointOptions.getCheckpointType();
        byte[] referenceBytes = checkpointOptions.getTargetLocation().isDefaultReference() ? null : checkpointOptions.getTargetLocation().getReferenceBytes();
        ByteBuffer allocate = ByteBuffer.allocate(38 + (referenceBytes == null ? 0 : referenceBytes.length));
        if (checkpointType == CheckpointType.CHECKPOINT) {
            i = 0;
        } else if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
            i = 4;
        } else if (checkpointType == CheckpointType.SAVEPOINT) {
            i = 1;
        } else if (checkpointType == CheckpointType.SAVEPOINT_SUSPEND) {
            i = 2;
        } else {
            if (checkpointType != CheckpointType.SAVEPOINT_TERMINATE) {
                throw new IOException("Unknown checkpoint type: " + checkpointType);
            }
            i = 3;
        }
        allocate.putInt(1);
        allocate.putLong(checkpointBarrier.getId());
        allocate.putLong(checkpointBarrier.getTimestamp());
        allocate.putInt(i);
        if (referenceBytes == null) {
            allocate.putInt(-1);
        } else {
            allocate.putInt(referenceBytes.length);
            allocate.put(referenceBytes);
        }
        allocate.put((byte) checkpointOptions.getAlignment().ordinal());
        allocate.putLong(checkpointOptions.getAlignedCheckpointTimeout());
        allocate.flip();
        return allocate;
    }

    private static CheckpointBarrier deserializeCheckpointBarrier(ByteBuffer byteBuffer) throws IOException {
        CheckpointType checkpointType;
        CheckpointStorageLocationReference checkpointStorageLocationReference;
        long j = byteBuffer.getLong();
        long j2 = byteBuffer.getLong();
        int i = byteBuffer.getInt();
        int i2 = byteBuffer.getInt();
        if (i == 0) {
            checkpointType = CheckpointType.CHECKPOINT;
        } else if (i == 4) {
            checkpointType = CheckpointType.FULL_CHECKPOINT;
        } else if (i == 1) {
            checkpointType = CheckpointType.SAVEPOINT;
        } else if (i == 2) {
            checkpointType = CheckpointType.SAVEPOINT_SUSPEND;
        } else {
            if (i != 3) {
                throw new IOException("Unknown checkpoint type code: " + i);
            }
            checkpointType = CheckpointType.SAVEPOINT_TERMINATE;
        }
        if (i2 == -1) {
            checkpointStorageLocationReference = CheckpointStorageLocationReference.getDefault();
        } else {
            byte[] bArr = new byte[i2];
            byteBuffer.get(bArr);
            checkpointStorageLocationReference = new CheckpointStorageLocationReference(bArr);
        }
        return new CheckpointBarrier(j, j2, new CheckpointOptions(checkpointType, checkpointStorageLocationReference, CheckpointOptions.AlignmentType.values()[byteBuffer.get()], byteBuffer.getLong()));
    }

    public static Buffer toBuffer(AbstractEvent abstractEvent, boolean z) throws IOException {
        ByteBuffer serializedEvent = toSerializedEvent(abstractEvent);
        NetworkBuffer networkBuffer = new NetworkBuffer(MemorySegmentFactory.wrap(serializedEvent.array()), FreeingBufferRecycler.INSTANCE, Buffer.DataType.getDataType(abstractEvent, z));
        networkBuffer.setSize(serializedEvent.remaining());
        return networkBuffer;
    }

    public static BufferConsumer toBufferConsumer(AbstractEvent abstractEvent, boolean z) throws IOException {
        MemorySegment wrap = MemorySegmentFactory.wrap(toSerializedEvent(abstractEvent).array());
        return new BufferConsumer(new NetworkBuffer(wrap, FreeingBufferRecycler.INSTANCE, Buffer.DataType.getDataType(abstractEvent, z)), wrap.size());
    }

    public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException {
        return fromSerializedEvent(buffer.getNioBufferReadable(), classLoader);
    }
}
