package com.appiancorp.process.kafka;

import io.prometheus.client.Collector;
import io.prometheus.client.Gauge;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/appiancorp/process/kafka/KafkaChunkedProducer.class */
public final class KafkaChunkedProducer implements AutoCloseable {
    public static final int STANDARD_HEADER_SIZE = 12;
    private static final int TOO_SMALL_TO_COMPRESS_IN_BYTES = 16;
    private final KafkaProducer<String, byte[]> kafkaProducer;
    private final int messageSizeMaxInBytes;
    private static final Logger LOG = Logger.getLogger(KafkaChunkedProducer.class);
    private static final Gauge outgoingByteRate = Gauge.build().namespace("appian").subsystem("process_history").name("outgoing_bytes_per_second_rate").labelNames(new String[]{"producer_metric_group"}).help("Outgoing bytes per second to Kafka for process history.").register();

    public List<Collector.MetricFamilySamples> getOutgoingByteRateMetrics() {
        return outgoingByteRate.collect();
    }

    public KafkaChunkedProducer(KafkaProducer<String, byte[]> kafkaProducer, int i) {
        if (kafkaProducer == null) {
            throw new NullPointerException("kafkaProducer");
        }
        if (i <= 0) {
            throw new IllegalArgumentException("messageSizeMaxInBytes must be positive");
        }
        this.kafkaProducer = kafkaProducer;
        this.messageSizeMaxInBytes = i;
    }

    public static KafkaChunkedProducer of(KafkaProducer kafkaProducer) {
        return of(kafkaProducer, AppianKafkaProducerFactory.getInstance().calculateMessageSizeMaxInBytes());
    }

    public static KafkaChunkedProducer of(KafkaProducer kafkaProducer, int i) {
        return new KafkaChunkedProducer(kafkaProducer, i);
    }

    public static CreatedTopic createTopicIfNecessary(String str, int i) throws InterruptedException, ExecutionException {
        CreatedTopic createTopic = AppianKafkaAdminClientFactory.createTopic(str, i);
        if (LOG.isInfoEnabled()) {
            LOG.info("Automatically created Kafka topic [" + createTopic + "]");
        }
        return createTopic;
    }

    public KafkaProducer<String, byte[]> getKafkaProducer() {
        return this.kafkaProducer;
    }

    public int getMessageSizeMaxInBytes() {
        return this.messageSizeMaxInBytes;
    }

    public KafkaChunkedSendResult send(String str, int i, KafkaTransactionId kafkaTransactionId, byte[] bArr) throws ExecutionException, InterruptedException {
        return send(str, i, kafkaTransactionId, bArr, System.currentTimeMillis(), null);
    }

    public KafkaChunkedSendResult send(String str, int i, KafkaTransactionId kafkaTransactionId, byte[] bArr, long j) throws ExecutionException, InterruptedException {
        return send(str, i, kafkaTransactionId, bArr, j, null);
    }

    private static void validateKafkaMessage(byte[] bArr) {
        if (bArr == null) {
            throw new NullPointerException("kafkaMessage");
        }
    }

    private int validateDataSizePerMessageInBytes(int i) {
        int i2 = this.messageSizeMaxInBytes - i;
        if (i2 <= 0) {
            throw new IllegalArgumentException("header size [" + i + "] and max kafka message size [" + this.messageSizeMaxInBytes + "] dataSizePerMessageInBytes have no remaining room for body");
        }
        return i2;
    }

    private void throwIfFutureFailure(Future<RecordMetadata> future) throws ExecutionException, InterruptedException {
        if ((future instanceof FutureRecordMetadata) || !future.isDone()) {
            return;
        }
        future.get();
    }

    private Future<RecordMetadata> sendSingleChunk(ProducerRecord<String, byte[]> producerRecord, List<Future<RecordMetadata>> list, Callback callback) throws ExecutionException, InterruptedException {
        Future<RecordMetadata> send = this.kafkaProducer.send(producerRecord, callback);
        if (send == null) {
            throw new NullPointerException("Future<RecordMetadata> should never be null");
        }
        throwIfFutureFailure(send);
        list.add(send);
        recordOutgoingByteRateMetricForPrometheus();
        return send;
    }

    private void recordOutgoingByteRateMetricForPrometheus() {
        if (this.kafkaProducer == null) {
            return;
        }
        Map metrics = this.kafkaProducer.metrics();
        for (Map.Entry entry : metrics.entrySet()) {
            if (((MetricName) entry.getKey()).name().equals("outgoing-byte-rate")) {
                ((Gauge.Child) outgoingByteRate.labels(new String[]{((MetricName) entry.getKey()).group()})).set(((Double) ((Metric) metrics.get(entry.getKey())).metricValue()).doubleValue());
            }
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:10:0x003f  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private java.util.concurrent.Future<org.apache.kafka.clients.producer.RecordMetadata> sendSingleChunkWithRetryOnCreate(java.lang.String r9, int r10, java.lang.String r11, byte[] r12, long r13, java.util.List<java.util.concurrent.Future<org.apache.kafka.clients.producer.RecordMetadata>> r15, org.apache.kafka.clients.producer.Callback r16) throws java.lang.InterruptedException, java.util.concurrent.ExecutionException {
        /*
            r8 = this;
            org.apache.kafka.clients.producer.ProducerRecord r0 = new org.apache.kafka.clients.producer.ProducerRecord
            r1 = r0
            r2 = r9
            r3 = r10
            java.lang.Integer r3 = java.lang.Integer.valueOf(r3)
            r4 = r13
            java.lang.Long r4 = java.lang.Long.valueOf(r4)
            r5 = r11
            r6 = r12
            r1.<init>(r2, r3, r4, r5, r6)
            r17 = r0
            r0 = r8
            r1 = r17
            r2 = r15
            r3 = r16
            java.util.concurrent.Future r0 = r0.sendSingleChunk(r1, r2, r3)     // Catch: org.apache.kafka.common.errors.TimeoutException -> L21 java.util.concurrent.ExecutionException -> L26
            return r0
        L21:
            r18 = move-exception
            goto L36
        L26:
            r18 = move-exception
            r0 = r18
            java.lang.Throwable r0 = r0.getCause()
            boolean r0 = r0 instanceof org.apache.kafka.common.errors.TimeoutException
            if (r0 != 0) goto L36
            r0 = r18
            throw r0
        L36:
            org.apache.log4j.Logger r0 = com.appiancorp.process.kafka.KafkaChunkedProducer.LOG
            boolean r0 = r0.isInfoEnabled()
            if (r0 == 0) goto L5d
            org.apache.log4j.Logger r0 = com.appiancorp.process.kafka.KafkaChunkedProducer.LOG
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "Could not send to topic ["
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r9
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = "], will try to create topic then retry send"
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r0.info(r1)
        L5d:
            r0 = r9
            r1 = r10
            com.appiancorp.process.kafka.CreatedTopic r0 = createTopicIfNecessary(r0, r1)
            r0 = r8
            r1 = r17
            r2 = r15
            r3 = r16
            java.util.concurrent.Future r0 = r0.sendSingleChunk(r1, r2, r3)
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: com.appiancorp.process.kafka.KafkaChunkedProducer.sendSingleChunkWithRetryOnCreate(java.lang.String, int, java.lang.String, byte[], long, java.util.List, org.apache.kafka.clients.producer.Callback):java.util.concurrent.Future");
    }

    private void prepareChunkOfBytesToSend(byte[] bArr, int i, byte[] bArr2, KafkaTransactionId kafkaTransactionId, int i2, int i3, int i4) {
        IntEncoderDecoder.store(i2, bArr2, 4);
        IntEncoderDecoder.store(i3, bArr2, 8);
        kafkaTransactionId.copyInto(bArr2);
        System.arraycopy(bArr, i, bArr2, 12 + kafkaTransactionId.headerSizeInBytesNotIncludingStandardHeaders(), i4);
    }

    public static byte[] compress(byte[] bArr) {
        byte[] compress;
        int length = bArr.length;
        if (length >= 16 && (compress = ByteArrayCompression.compress(bArr)) != null && compress.length < length) {
            return compress;
        }
        return null;
    }

    public KafkaChunkedSendResult send(String str, int i, KafkaTransactionId kafkaTransactionId, byte[] bArr, long j, Callback callback) throws ExecutionException, InterruptedException {
        byte[] bArr2;
        validateKafkaMessage(bArr);
        int type = kafkaTransactionId.getType();
        byte[] compress = kafkaTransactionId.isCompressed() ? compress(bArr) : null;
        if (compress != null) {
            bArr2 = compress;
            type |= Integer.MIN_VALUE;
        } else {
            bArr2 = bArr;
        }
        int headerSizeInBytesNotIncludingStandardHeaders = 12 + kafkaTransactionId.headerSizeInBytesNotIncludingStandardHeaders();
        int validateDataSizePerMessageInBytes = validateDataSizePerMessageInBytes(headerSizeInBytesNotIncludingStandardHeaders);
        int length = bArr2.length;
        int ceil = (int) Math.ceil(length / validateDataSizePerMessageInBytes);
        ArrayList arrayList = new ArrayList();
        int i2 = length;
        int i3 = 0;
        int i4 = 0;
        if (i2 > validateDataSizePerMessageInBytes) {
            byte[] bArr3 = new byte[headerSizeInBytesNotIncludingStandardHeaders + validateDataSizePerMessageInBytes];
            IntEncoderDecoder.store(type, bArr3, 0);
            while (i2 > validateDataSizePerMessageInBytes) {
                prepareChunkOfBytesToSend(bArr2, i4, bArr3, kafkaTransactionId, i3, ceil, validateDataSizePerMessageInBytes);
                sendSingleChunkWithRetryOnCreate(str, i, kafkaTransactionId.getKey(i3), (byte[]) bArr3.clone(), j, arrayList, callback);
                i3++;
                i4 += validateDataSizePerMessageInBytes;
                i2 -= validateDataSizePerMessageInBytes;
            }
        }
        byte[] bArr4 = new byte[headerSizeInBytesNotIncludingStandardHeaders + i2];
        IntEncoderDecoder.store(type, bArr4, 0);
        prepareChunkOfBytesToSend(bArr2, i4, bArr4, kafkaTransactionId, i3, ceil, i2);
        sendSingleChunkWithRetryOnCreate(str, i, kafkaTransactionId.getKey(i3), bArr4, j, arrayList, callback);
        return new KafkaChunkedSendResult(this, arrayList, ceil);
    }

    public static RecordMetadata[] await(List<Future<RecordMetadata>> list, long j, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
        if (j < 0) {
            throw new IllegalArgumentException("negative timeout");
        }
        long millis = timeUnit.toMillis(j);
        long currentTimeMillis = System.currentTimeMillis() + millis;
        int size = list.size();
        RecordMetadata[] recordMetadataArr = new RecordMetadata[size];
        boolean z = false;
        for (int i = 0; i < size; i++) {
            Future<RecordMetadata> future = list.get(i);
            long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
            if (currentTimeMillis2 < 0) {
                if (!z) {
                    throw new TimeoutException("Read [" + i + "] of [" + size + "]");
                }
                currentTimeMillis2 = millis;
            }
            recordMetadataArr[i] = future.get(currentTimeMillis2, TimeUnit.MILLISECONDS);
            z = true;
        }
        return recordMetadataArr;
    }

    public static RecordMetadata[] await(List<Future<RecordMetadata>> list) throws ExecutionException, InterruptedException {
        int size = list.size();
        RecordMetadata[] recordMetadataArr = new RecordMetadata[size];
        for (int i = 0; i < size; i++) {
            recordMetadataArr[i] = list.get(i).get();
        }
        return recordMetadataArr;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        flush();
        this.kafkaProducer.close();
    }

    public void flush() {
        this.kafkaProducer.flush();
    }
}
