package com.appian.komodo.topology;

import com.appian.komodo.api.AutoGson;
import com.appian.komodo.config.Engine;
import com.appian.komodo.config.EngineId;
import com.appian.komodo.util.function.ImmutableCollectionCollectors;
import com.google.auto.value.AutoValue;
import com.spotify.dns.DnsSrvResolver;
import com.spotify.dns.DnsSrvResolvers;
import com.typesafe.config.Config;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import komodo.shaded.com.google.common.annotations.VisibleForTesting;
import komodo.shaded.com.google.common.base.Preconditions;
import komodo.shaded.com.google.common.collect.ImmutableList;
import komodo.shaded.com.google.common.collect.ImmutableSet;
import komodo.shaded.com.google.common.net.HostAndPort;
import komodo.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoValue
@AutoGson
/* loaded from: input_file:com/appian/komodo/topology/DNSBackedTopologySettings.class */
public abstract class DNSBackedTopologySettings extends TopologySettings {
    private static final String HOST_PORT_SEPARATOR = ":";
    private static final String HEADLESS_SUFFIX = "headless";
    private static final String PORT_PROTOCOL = "tcp";
    private static final String KAFKA_PORT_NAME = "broker";
    private static final String ZOOKEEPER_PORT_NAME = "client";
    private static final String ENGINE_PORT_NAME = "engine";
    private static final String ENGINE_SVC_PREFIX_TERMINATOR = "service-manager";
    private static final int DEFAULT_K_PORT = 0;
    private static final Logger LOG = LoggerFactory.getLogger(DNSBackedTopologySettings.class);
    private static final Duration LOOKUP_TIMEOUT = Duration.ofSeconds(5);
    private static final Executor LOOKUP_EXECUTOR = Executors.newFixedThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(DNSBackedTopologySettings.class.getSimpleName() + "-%d").build());

    public static DnsSrvResolver createSrvResolver() {
        return DnsSrvResolvers.newBuilder().dnsLookupTimeoutMillis(LOOKUP_TIMEOUT.toMillis()).executor(LOOKUP_EXECUTOR).build();
    }

    @Nullable
    public abstract String getEngineServicePrefix();

    @Nullable
    public abstract String getZookeeperServiceName();

    public abstract String getKafkaServiceName();

    public abstract DnsSrvResolver getDnsSrvResolver();

    public abstract UnaryOperator<String> getServiceFQDNResolver();

    @Override // com.appian.komodo.topology.TopologySettings
    public boolean requiresServiceDiscovery() {
        return true;
    }

    @VisibleForTesting
    protected static DNSBackedTopologySettings fromParts(int i, ImmutableSet<EngineId> immutableSet, List<String> list, List<String> list2, int i2, int i3, String str, String str2, String str3, int i4, int i5, int i6, int i7, DnsSrvResolver dnsSrvResolver, UnaryOperator<String> unaryOperator, Engine... engineArr) {
        ensureAdminPortAvailable(i);
        return new AutoValue_DNSBackedTopologySettings(ImmutableSet.copyOf(engineArr), i, immutableSet, list, list2, true, i2, i3, i4, i5, i6, i7, str, str3, str2, dnsSrvResolver, unaryOperator);
    }

    public static TopologySettings fromConfig(Config config) {
        return fromConfig(config, true);
    }

    public static TopologySettings fromConfig(Config config, boolean z) {
        return fromConfig(config, z, DNSBackedTopologySettings::parseHosts);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v45, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r0v47, types: [java.util.List] */
    private static DNSBackedTopologySettings parseHosts(Config config, int i, int i2, int i3, int i4, int i5, int i6, int i7, boolean z) {
        ImmutableSet immutableSet = (ImmutableSet) config.getConfigList(TopologySettings.ENGINES_KEY).stream().map(Engine::fromConfig).collect(ImmutableCollectionCollectors.toImmutableSet());
        ImmutableSet immutableSet2 = config.hasPath(TopologySettings.CLUSTER_ENGINES_KEY) ? (ImmutableSet) config.getConfigList(TopologySettings.CLUSTER_ENGINES_KEY).stream().map(Engine::fromConfig).collect(ImmutableCollectionCollectors.toImmutableSet()) : immutableSet;
        ImmutableSet immutableSet3 = (ImmutableSet) immutableSet2.stream().map((v0) -> {
            return v0.getEngineId();
        }).distinct().collect(ImmutableCollectionCollectors.toImmutableSet());
        DnsSrvResolver createSrvResolver = createSrvResolver();
        String str = (String) immutableSet2.stream().findAny().map((v0) -> {
            return v0.getGatewayAddress();
        }).map((v0) -> {
            return v0.getHost();
        }).map(DNSBackedTopologySettings::getServiceFromTopologyEntry).map(str2 -> {
            return StringUtils.substringBeforeLast(str2, ENGINE_SVC_PREFIX_TERMINATOR) + ENGINE_SVC_PREFIX_TERMINATOR;
        }).orElse(null);
        String str3 = null;
        if (config.hasPath(TopologySettings.ZK_HOSTS_KEY)) {
            str3 = getServiceFromTopologyEntry((String) config.getStringList(TopologySettings.ZK_HOSTS_KEY).stream().findAny().get());
        }
        return new AutoValue_DNSBackedTopologySettings(immutableSet, i, immutableSet3, config.hasPath(TopologySettings.SOURCE_KAFKA_HOSTS_KEY) ? config.getStringList(TopologySettings.SOURCE_KAFKA_HOSTS_KEY) : ImmutableList.of(), config.hasPath(TopologySettings.MIRROR_MAKER_HOSTS_KEY) ? config.getStringList(TopologySettings.MIRROR_MAKER_HOSTS_KEY) : ImmutableList.of(), z, i2, i3, i4, i5, i6, i7, str, str3, getServiceFromTopologyEntry((String) config.getStringList(TopologySettings.KAFKA_HOSTS_KEY).stream().findAny().get()), createSrvResolver, DNSBackedTopologySettings::getServiceFQDNOrThrow);
    }

    @Override // com.appian.komodo.topology.TopologySettings
    public ImmutableSet<Engine> getClusterEngines() {
        return (ImmutableSet) getClusterEngineIds().stream().flatMap(engineId -> {
            try {
                return getEndpointsForEngine(getDnsSrvResolver(), getEngineServicePrefix(), engineId, getServiceFQDNResolver()).stream().map(str -> {
                    return Engine.fromParts(engineId.getName(), engineId.getShard(), HostAndPort.fromString(str), HostAndPort.fromParts(StringUtils.substringBefore(str, HOST_PORT_SEPARATOR), 0));
                });
            } catch (Exception e) {
                LOG.warn("Unable to get latest endpoints for {}, using {} due to {}", new Object[]{engineId, Collections.emptyList(), e});
                return Stream.empty();
            }
        }).collect(ImmutableCollectionCollectors.toImmutableSet());
    }

    @Override // com.appian.komodo.topology.TopologySettings
    public List<String> getClusterKomodoHosts() {
        return (List) getClusterEngines().stream().map((v0) -> {
            return v0.getGatewayAddress();
        }).map((v0) -> {
            return v0.getHost();
        }).distinct().collect(Collectors.toList());
    }

    @Override // com.appian.komodo.topology.TopologySettings
    public List<String> getZookeeperHosts() {
        if (getZookeeperServiceName() == null) {
            return Collections.emptyList();
        }
        try {
            return getEndpointsForZookeeper(getDnsSrvResolver(), getZookeeperServiceName(), getServiceFQDNResolver());
        } catch (Exception e) {
            LOG.warn("Unable to get latest endpoints for zookeeper, using {} due to {}", Collections.emptyList(), e);
            return Collections.emptyList();
        }
    }

    @Override // com.appian.komodo.topology.TopologySettings, com.appian.komodo.topology.KafkaTopology
    public List<String> getKafkaHosts() {
        try {
            return getEndpointsForKafka(getDnsSrvResolver(), getKafkaServiceName(), getServiceFQDNResolver());
        } catch (Exception e) {
            LOG.warn("Unable to get latest endpoints for kafka, using {} due to {}", Collections.emptyList(), e);
            return Collections.emptyList();
        }
    }

    @Override // com.appian.komodo.topology.KafkaTopology
    public List<String> getSimplifiedKafkaHosts() {
        return (List) getKafkaHosts().stream().map(DNSBackedTopologySettings::getSimplifiedConnectionString).collect(Collectors.toList());
    }

    public static String getSimplifiedConnectionString(String str) {
        HostAndPort fromString = HostAndPort.fromString(str);
        return StringUtils.joinWith(HOST_PORT_SEPARATOR, new Object[]{fromString.getHost().substring(0, fromString.getHost().indexOf(".", fromString.getHost().indexOf(".") + 1)), Integer.valueOf(fromString.getPort())});
    }

    @VisibleForTesting
    protected int getBrokerIdFromPodName(String str) {
        return Integer.parseInt(str.substring(str.lastIndexOf(45) + 1));
    }

    @Override // com.appian.komodo.topology.TopologySettings
    public int getLocalBrokerId() {
        String str = System.getenv("POD_NAME");
        Preconditions.checkNotNull(str, "DNS backed topology requires Kubernetes");
        return getBrokerIdFromPodName(str);
    }

    public static String getServiceFromTopologyEntry(String str) {
        return StringUtils.substringBefore(StringUtils.substringAfter(str, "."), HOST_PORT_SEPARATOR);
    }

    private static String createSrvName(String str, String str2) {
        return String.join(".", "_" + str, "_tcp", str2);
    }

    public static List<String> getEndpointsForEngine(DnsSrvResolver dnsSrvResolver, String str, EngineId engineId) throws ExecutionException, InterruptedException, TimeoutException {
        return getEndpointsForEngine(dnsSrvResolver, str, engineId, DNSBackedTopologySettings::getServiceFQDNOrThrow);
    }

    public static List<String> getEndpointsForEngine(DnsSrvResolver dnsSrvResolver, String str, EngineId engineId, UnaryOperator<String> unaryOperator) throws ExecutionException, InterruptedException, TimeoutException {
        return getEndpointsBySRVRecordName(dnsSrvResolver, createSrvName(ENGINE_PORT_NAME, (String) unaryOperator.apply(str + "-" + engineId.getExternalId() + "-" + HEADLESS_SUFFIX)));
    }

    private static List<String> getEndpointsForKafka(DnsSrvResolver dnsSrvResolver, String str, UnaryOperator<String> unaryOperator) throws ExecutionException, InterruptedException, TimeoutException {
        return getEndpointsBySRVRecordName(dnsSrvResolver, createSrvName(KAFKA_PORT_NAME, (String) unaryOperator.apply(str)));
    }

    private static List<String> getEndpointsForZookeeper(DnsSrvResolver dnsSrvResolver, String str, UnaryOperator<String> unaryOperator) throws ExecutionException, InterruptedException, TimeoutException {
        return getEndpointsBySRVRecordName(dnsSrvResolver, createSrvName(ZOOKEEPER_PORT_NAME, (String) unaryOperator.apply(str)));
    }

    private static List<String> getEndpointsBySRVRecordName(DnsSrvResolver dnsSrvResolver, String str) throws ExecutionException, InterruptedException, TimeoutException {
        return (List) ((List) dnsSrvResolver.resolveAsync(str).toCompletableFuture().get(LOOKUP_TIMEOUT.toMillis() * 2, TimeUnit.MILLISECONDS)).stream().map(lookupResult -> {
            return lookupResult.host() + HOST_PORT_SEPARATOR + lookupResult.port();
        }).collect(Collectors.toList());
    }

    public static String getServiceFQDNOrThrow(String str) {
        try {
            return getServiceFQDNFromRandomFQDN(str, InetAddress.getByName(str).getCanonicalHostName());
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    protected static String getServiceFQDNFromRandomFQDN(String str, String str2) {
        return StringUtils.joinWith(".", new Object[]{str, StringUtils.substringAfter(StringUtils.substringAfter(str2, "."), ".")});
    }

    @Override // com.appian.komodo.topology.TopologySettings
    public boolean isDns() {
        return true;
    }
}
