package com.appiancorp.process.history;

import com.appiancorp.core.expr.portable.JsonContext;
import com.appiancorp.core.expr.portable.JsonContextBuilder;
import com.appiancorp.process.kafka.AppianKafkaProducerFactory;
import com.appiancorp.process.kafka.KafkaChunkedProducer;
import com.appiancorp.process.kafka.KafkaChunkedSendResult;
import com.appiancorp.process.kafka.KafkaTransactionId;
import com.appiancorp.type.Diff;
import io.prometheus.client.Counter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/appiancorp/process/history/ProcessHistoryKafkaProducer.class */
public class ProcessHistoryKafkaProducer implements AutoCloseable {
    protected static final Logger LOG = Logger.getLogger(ProcessHistoryKafkaProducer.class);
    public static final JsonContext JSON_CONTEXT = new JsonContext(new JsonContextBuilder().setJsonConstants(Diff.JSON_CONSTANTS));
    private static final Counter serializationErrorCounter = Counter.build().namespace("appian").subsystem("process_history").name("kafka_serialization_error_total").help("This counter tracks the number of errors serializing the data to be sent to Kafka for process history.").register();
    private static final AtomicReference<ProcessHistoryKafkaProducer> processHistoryKafkaProducer = new AtomicReference<>();
    private final KafkaChunkedProducer kafkaChunkedProducer;
    private boolean singleton;

    protected ProcessHistoryKafkaProducer(boolean z) {
        this.kafkaChunkedProducer = KafkaChunkedProducer.of(AppianKafkaProducerFactory.generateKafkaProducer());
        this.singleton = z;
    }

    protected ProcessHistoryKafkaProducer() {
        this(false);
    }

    public static ProcessHistoryKafkaProducer getInstance() {
        ProcessHistoryKafkaProducer processHistoryKafkaProducer2 = processHistoryKafkaProducer.get();
        if (processHistoryKafkaProducer2 != null) {
            return processHistoryKafkaProducer2;
        }
        ProcessHistoryKafkaProducer processHistoryKafkaProducer3 = new ProcessHistoryKafkaProducer(true);
        if (!processHistoryKafkaProducer.compareAndSet(null, processHistoryKafkaProducer3)) {
            try {
                processHistoryKafkaProducer3.singleton = false;
                processHistoryKafkaProducer3.close();
            } catch (Throwable th) {
                LOG.error("Could not close temporary ProcessHistoryKafkaProducer", th);
            }
        }
        return processHistoryKafkaProducer.get();
    }

    public byte[] prepareJsonBytes(ProcessHistoryRow processHistoryRow) {
        try {
            return processHistoryRow.toJsonByteArray(JSON_CONTEXT);
        } catch (Exception e) {
            serializationErrorCounter.inc();
            LOG.error("Could not prepare process history, will not retry", e);
            throw new JsonMalformedRuntimeException(e);
        }
    }

    public int getMessageSizeMaxInBytes() {
        return this.kafkaChunkedProducer.getMessageSizeMaxInBytes();
    }

    public KafkaChunkedSendResult sendAsynchronous(String str, int i, ProcessHistoryRow processHistoryRow) throws ExecutionException, InterruptedException {
        return sendAsynchronous(str, i, processHistoryRow, prepareJsonBytes(processHistoryRow));
    }

    public KafkaChunkedSendResult sendAsynchronous(String str, int i, ProcessHistoryRow processHistoryRow, byte[] bArr) throws ExecutionException, InterruptedException {
        return sendAsynchronous(str, i, processHistoryRow.kafkaTransactionId(), bArr, processHistoryRow.time());
    }

    public KafkaChunkedSendResult sendAsynchronous(String str, int i, KafkaTransactionId kafkaTransactionId, byte[] bArr, long j) throws ExecutionException, InterruptedException {
        return this.kafkaChunkedProducer.send(str, i, kafkaTransactionId, bArr, j);
    }

    public RecordMetadata[] send(String str, int i, ProcessHistoryRow processHistoryRow) throws ExecutionException, InterruptedException {
        return sendAsynchronous(str, i, processHistoryRow).get();
    }

    public RecordMetadata[] send(String str, int i, ProcessHistoryRow processHistoryRow, byte[] bArr) throws ExecutionException, InterruptedException {
        return sendAsynchronous(str, i, processHistoryRow, bArr).get();
    }

    public KafkaChunkedProducer getKafkaChunkedProducer() {
        return this.kafkaChunkedProducer;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        flush();
        if (this.singleton) {
            return;
        }
        this.kafkaChunkedProducer.close();
    }

    public void flush() {
        this.kafkaChunkedProducer.flush();
    }
}
