package no.kantega.svv.helper;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import no.kantega.svv.helper.serialization.MapSerde;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.jetbrains.annotations.NotNull;

/* compiled from: KafkaStreamsBuilder.kt */
@Metadata(mv = {1, 1, 11}, bv = {1, 0, 2}, k = 1, d1 = {"��8\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010��\n��\n\u0002\u0010\u000e\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\r\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\u0018��  *\u0004\b��\u0010\u0001*\u0004\b\u0001\u0010\u00022\u00020\u0003:\u0001 BK\b\u0002\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0005\u0012\u0006\u0010\u0007\u001a\u00020\u0005\u0012\u0006\u0010\b\u001a\u00020\u0005\u0012\f\u0010\t\u001a\b\u0012\u0004\u0012\u00028��0\n\u0012\f\u0010\u000b\u001a\b\u0012\u0004\u0012\u00028\u00010\n\u0012\u0006\u0010\f\u001a\u00020\r¢\u0006\u0002\u0010\u000eJ\"\u0010\u000f\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010��2\u0006\u0010\u0010\u001a\u00020\u00052\u0006\u0010\u0011\u001a\u00020\u0003J\b\u0010\u0012\u001a\u00020\rH\u0002J&\u0010\u0013\u001a\u000e\u0012\u0004\u0012\u0002H\u0014\u0012\u0004\u0012\u00028\u00010��\"\u0004\b\u0002\u0010\u00142\f\u0010\u0015\u001a\b\u0012\u0004\u0012\u0002H\u00140\nJ&\u0010\u0016\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u0002H\u00170��\"\u0004\b\u0002\u0010\u00172\f\u0010\u0018\u001a\b\u0012\u0004\u0012\u0002H\u00170\nJ\u001a\u0010\u0019\u001a\u000e\u0012\u0004\u0012\u00028��\u0012\u0004\u0012\u00028\u00010��2\u0006\u0010\f\u001a\u00020\rJ2\u0010\u001a\u001a\u00020\u001b2*\u0010\u001c\u001a&\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0005\u0012\u0004\u0012\u00028��0\u001e\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0005\u0012\u0004\u0012\u00028\u00010\u001e0\u001dJ\b\u0010\u001f\u001a\u00020\u001bH\u0002R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\t\u001a\b\u0012\u0004\u0012\u00028��0\nX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0007\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u000b\u001a\b\u0012\u0004\u0012\u00028\u00010\nX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\f\u001a\u00020\rX\u0082\u0004¢\u0006\u0002\n��¨\u0006!"}, d2 = {"Lno/kantega/svv/helper/KafkaStreamsBuilder;", "IN", "OUT", "", "agentName", "", "kafkaBootstrapServers", "incomingTopicName", "outgoingTopicName", "incomingSerde", "Lorg/apache/kafka/common/serialization/Serde;", "outgoingSerde", "properties", "Ljava/util/Properties;", "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Lorg/apache/kafka/common/serialization/Serde;Lorg/apache/kafka/common/serialization/Serde;Ljava/util/Properties;)V", "addProperty", "key", "value", "createProps", "setIncomingSerde", "NewIN", "newInSerde", "setOutgoingSerde", "NewOUT", "newOutSerde", "setProperties", "start", "", "streamMapping", "Lkotlin/Function1;", "Lorg/apache/kafka/streams/kstream/KStream;", "waitForIncomingTopicToExist", "Companion", "kafka-agent-helper-library"})
/* loaded from: input_file:no/kantega/svv/helper/KafkaStreamsBuilder.class */
public final class KafkaStreamsBuilder<IN, OUT> {
    private final String agentName;
    private final String kafkaBootstrapServers;
    private final String incomingTopicName;
    private final String outgoingTopicName;
    private final Serde<IN> incomingSerde;
    private final Serde<OUT> outgoingSerde;
    private final Properties properties;
    public static final Companion Companion = new Companion(null);

    /* compiled from: KafkaStreamsBuilder.kt */
    @Metadata(mv = {1, 1, 11}, bv = {1, 0, 2}, k = 1, d1 = {"��\"\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010$\n\u0002\u0010\u000e\n\u0002\b\u0005\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002JJ\u0010\u0003\u001a&\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0006\u0012\u0004\u0012\u00020\u00060\u0005\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0006\u0012\u0004\u0012\u00020\u00060\u00050\u00042\u0006\u0010\u0007\u001a\u00020\u00062\u0006\u0010\b\u001a\u00020\u00062\u0006\u0010\t\u001a\u00020\u00062\u0006\u0010\n\u001a\u00020\u0006J\u0018\u0010\u000b\u001a\u00020\f2\u0006\u0010\u0007\u001a\u00020\u00062\u0006\u0010\b\u001a\u00020\u0006H\u0002¨\u0006\r"}, d2 = {"Lno/kantega/svv/helper/KafkaStreamsBuilder$Companion;", "", "()V", "create", "Lno/kantega/svv/helper/KafkaStreamsBuilder;", "", "", "agentName", "kafkaBootstrapServers", "incomingTopicName", "outgoingTopicName", "createProps", "Ljava/util/Properties;", "kafka-agent-helper-library"})
    /* loaded from: input_file:no/kantega/svv/helper/KafkaStreamsBuilder$Companion.class */
    public static final class Companion {
        @NotNull
        public final KafkaStreamsBuilder<Map<String, String>, Map<String, String>> create(@NotNull String str, @NotNull String str2, @NotNull String str3, @NotNull String str4) {
            Intrinsics.checkParameterIsNotNull(str, "agentName");
            Intrinsics.checkParameterIsNotNull(str2, "kafkaBootstrapServers");
            Intrinsics.checkParameterIsNotNull(str3, "incomingTopicName");
            Intrinsics.checkParameterIsNotNull(str4, "outgoingTopicName");
            return new KafkaStreamsBuilder<>(str, str2, str3, str4, new MapSerde(), new MapSerde(), createProps(str, str2), null);
        }

        private final Properties createProps(String str, String str2) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", str2);
            properties.put("application.id", str);
            properties.put("default.key.serde", Serdes.String().getClass().getName());
            properties.put("default.value.serde", MapSerde.class);
            properties.put("buffered.records.per.partition", "2");
            String str3 = System.getenv("SCHEMA_REGISTRY_URL");
            if (str3 != null) {
                properties.put("schema.registry.url", str3);
            }
            return properties;
        }

        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    @NotNull
    public final <NewIN> KafkaStreamsBuilder<NewIN, OUT> setIncomingSerde(@NotNull Serde<NewIN> serde) {
        Intrinsics.checkParameterIsNotNull(serde, "newInSerde");
        return new KafkaStreamsBuilder<>(this.agentName, this.kafkaBootstrapServers, this.incomingTopicName, this.outgoingTopicName, serde, this.outgoingSerde, this.properties);
    }

    @NotNull
    public final <NewOUT> KafkaStreamsBuilder<IN, NewOUT> setOutgoingSerde(@NotNull Serde<NewOUT> serde) {
        Intrinsics.checkParameterIsNotNull(serde, "newOutSerde");
        return new KafkaStreamsBuilder<>(this.agentName, this.kafkaBootstrapServers, this.incomingTopicName, this.outgoingTopicName, this.incomingSerde, serde, this.properties);
    }

    @NotNull
    public final KafkaStreamsBuilder<IN, OUT> setProperties(@NotNull Properties properties) {
        Intrinsics.checkParameterIsNotNull(properties, "properties");
        return new KafkaStreamsBuilder<>(this.agentName, this.kafkaBootstrapServers, this.incomingTopicName, this.outgoingTopicName, this.incomingSerde, this.outgoingSerde, properties);
    }

    @NotNull
    public final KafkaStreamsBuilder<IN, OUT> addProperty(@NotNull String str, @NotNull Object obj) {
        Intrinsics.checkParameterIsNotNull(str, "key");
        Intrinsics.checkParameterIsNotNull(obj, "value");
        this.properties.put(str, obj);
        return this;
    }

    public final void start(@NotNull Function1<? super KStream<String, IN>, ? extends KStream<String, OUT>> function1) {
        Intrinsics.checkParameterIsNotNull(function1, "streamMapping");
        KafkaHelper.Companion.waitForKafkaToGetReady(this.kafkaBootstrapServers, 240);
        waitForIncomingTopicToExist();
        final AgentHelper agentHelper = new AgentHelper(this.agentName, this.kafkaBootstrapServers);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream peek = streamsBuilder.stream(this.incomingTopicName, Consumed.with(new Serdes.StringSerde(), this.incomingSerde)).peek(new ForeachAction<String, IN>() { // from class: no.kantega.svv.helper.KafkaStreamsBuilder$start$stream$1
            /* JADX WARN: Multi-variable type inference failed */
            public /* bridge */ /* synthetic */ void apply(Object obj, Object obj2) {
                apply((String) obj, (String) obj2);
            }

            public final void apply(String str, IN in) {
                String str2;
                AgentHelper agentHelper2 = agentHelper;
                str2 = KafkaStreamsBuilder.this.incomingTopicName;
                agentHelper2.registerRead(str2);
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(peek, "stream");
        ((KStream) function1.invoke(peek)).peek(new ForeachAction<String, OUT>() { // from class: no.kantega.svv.helper.KafkaStreamsBuilder$start$1
            /* JADX WARN: Multi-variable type inference failed */
            public /* bridge */ /* synthetic */ void apply(Object obj, Object obj2) {
                apply((String) obj, (String) obj2);
            }

            public final void apply(String str, OUT out) {
                String str2;
                AgentHelper agentHelper2 = agentHelper;
                str2 = KafkaStreamsBuilder.this.outgoingTopicName;
                agentHelper2.registerWrite(str2);
            }
        }).to(this.outgoingTopicName, Produced.valueSerde(this.outgoingSerde));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), createProps());
        System.out.println((Object) "Starting streams");
        kafkaStreams.start();
    }

    private final void waitForIncomingTopicToExist() {
        System.out.println((Object) "Checking if incoming topic exists");
        boolean z = false;
        do {
            try {
                AdminClient.create(createProps()).describeTopics(CollectionsKt.arrayListOf(new String[]{this.incomingTopicName})).all().get(2L, TimeUnit.MINUTES);
                z = true;
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                    throw e;
                }
                System.out.println((Object) "Incoming topic does not exist yet. Stream can not be started yet. Trying again in 5 seconds.");
                Thread.sleep(5000L);
            }
        } while (!z);
        System.out.println((Object) "Ok! Incoming topic exists.");
    }

    private final Properties createProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaBootstrapServers);
        properties.put("application.id", this.agentName);
        properties.put("default.key.serde", Serdes.String().getClass().getName());
        properties.put("default.value.serde", MapSerde.class);
        properties.put("buffered.records.per.partition", "2");
        String str = System.getenv("SCHEMA_REGISTRY_URL");
        if (str != null) {
            properties.put("schema.registry.url", str);
        }
        return properties;
    }

    private KafkaStreamsBuilder(String str, String str2, String str3, String str4, Serde<IN> serde, Serde<OUT> serde2, Properties properties) {
        this.agentName = str;
        this.kafkaBootstrapServers = str2;
        this.incomingTopicName = str3;
        this.outgoingTopicName = str4;
        this.incomingSerde = serde;
        this.outgoingSerde = serde2;
        this.properties = properties;
    }

    public /* synthetic */ KafkaStreamsBuilder(@NotNull String str, @NotNull String str2, @NotNull String str3, @NotNull String str4, @NotNull Serde serde, @NotNull Serde serde2, @NotNull Properties properties, DefaultConstructorMarker defaultConstructorMarker) {
        this(str, str2, str3, str4, serde, serde2, properties);
    }
}
