package com.appiancorp.process.engine;

import com.appiancorp.core.expr.Id;
import com.appiancorp.core.util.PortableArrayUtils;
import com.appiancorp.object.AppianThreadFactory;
import com.appiancorp.process.execution.KafkaExecutionConfiguration;
import com.appiancorp.process.history.ProcessHistoryKafkaProducer;
import com.appiancorp.process.history.ProcessHistoryRow;
import com.appiancorp.process.kafka.AppianKafkaAdminClientFactory;
import com.appiancorp.process.kafka.AppianKafkaTopic;
import com.appiancorp.process.kafka.CreatedTopic;
import com.appiancorp.process.kafka.KafkaChunkedSendResult;
import com.appiancorp.process.kafka.KafkaTopicPrefixes;
import com.appiancorp.process.kafka.KafkaTransactionId;
import com.appiancorp.suite.cfg.ConfigurationFactory;
import com.appiancorp.suite.cfg.FeatureToggleConfiguration;
import com.appiancorp.suiteapi.process.TypedVariable;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.client.Counter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;

@SuppressFBWarnings({"SE_BAD_FIELD"})
/* loaded from: input_file:com/appiancorp/process/engine/ProcessHistoryRequest.class */
public class ProcessHistoryRequest extends ContinuationRequest implements UnattendedRequest {
    public static final int NOT_FOUND = -1;
    public static final String KAFKA_OFFSET_ERROR_STRING = "offsetError";
    public static final String KAFKA_SEND_ERROR_STRING = "sendError";
    protected String originalTopic;
    protected String topic;
    protected ProcessHistoryRow[] processHistoryRows;
    protected boolean initializeTopic;
    protected boolean duplicate;
    protected Integer partition;
    protected Long processId;
    protected long minOffset = Long.MAX_VALUE;
    protected long maxOffset = Long.MIN_VALUE;
    protected int maxCountOfChunks = Integer.MIN_VALUE;
    protected boolean setOffsets;
    private static final int RETRY_WRITE_COUNT = 10;
    private static final long RETRY_WRITE_MS = 250;
    protected static final Logger LOG = Logger.getLogger(ProcessHistoryRequest.class);
    protected static final Counter writeToKafkaErrorCounter = Counter.build().namespace("appian").subsystem("process_history").name("kafka_error_counter_total").help("This counter tracks any issues sending any information to Kafka with the processHistoryKafkaProducer").labelNames(new String[]{"type"}).register();
    private static final Map<String, TopicPartitionTracker> topicPartitionTrackerMap = new HashMap();
    private static final AtomicInteger topicPartitionAssignment = new AtomicInteger();
    private static final long boot = System.currentTimeMillis();
    private static final PriorityBlockingQueue<Runnable> priorityQueue = new PriorityBlockingQueue<>();
    private static final AppianThreadFactory appianThreadFactory = new AppianThreadFactory("ProcessHistoryRequest", (FeatureToggleConfiguration) ConfigurationFactory.getConfiguration(FeatureToggleConfiguration.class));
    private static final ExecutorService executorServive = new ThreadPoolExecutor(1, 1, 1, TimeUnit.HOURS, priorityQueue, appianThreadFactory);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/appiancorp/process/engine/ProcessHistoryRequest$TopicPartitionTracker.class */
    public static class TopicPartitionTracker {
        private final long expireTs = System.currentTimeMillis() + ProcessHistoryRequest.access$100().getRedeterminePartitionsMs();
        private final Set<Integer> partitions;

        public TopicPartitionTracker(Set<Integer> set) {
            this.partitions = set;
        }

        public boolean expired() {
            return System.currentTimeMillis() > this.expireTs || this.partitions.isEmpty();
        }

        public Set<Integer> getPartitions() {
            return this.partitions;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/appiancorp/process/engine/ProcessHistoryRequest$WriteRequest.class */
    public class WriteRequest implements Comparable, Runnable {
        private final ProcessHistoryKafkaProducer producer;
        private final KafkaTransactionId[] kafkaTransactionIds;
        private final long[] time;
        private final byte[][] preparedProcessHistoryRows;
        private final long priority;
        private final CompletableFuture<KafkaChunkedSendResult[]> completableFuture = new CompletableFuture<>();

        public WriteRequest(ProcessHistoryKafkaProducer processHistoryKafkaProducer, int i, KafkaTransactionId[] kafkaTransactionIdArr, long[] jArr, byte[][] bArr) {
            this.kafkaTransactionIds = kafkaTransactionIdArr;
            this.time = jArr;
            this.priority = (System.currentTimeMillis() - ProcessHistoryRequest.boot) + i;
            this.producer = processHistoryKafkaProducer;
            this.preparedProcessHistoryRows = bArr;
        }

        public Future<KafkaChunkedSendResult[]> getFuture() {
            return this.completableFuture;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.completableFuture.complete(write());
            } catch (Throwable th) {
                this.completableFuture.completeExceptionally(th);
            }
        }

        public KafkaChunkedSendResult[] write() throws ExecutionException, InterruptedException {
            int length = this.kafkaTransactionIds.length;
            KafkaChunkedSendResult[] kafkaChunkedSendResultArr = new KafkaChunkedSendResult[length];
            for (int i = 0; i < length; i++) {
                kafkaChunkedSendResultArr[i] = ProcessHistoryRequest.this.writeProcessHistoryRowToKafka(this.producer, this.kafkaTransactionIds[i], this.preparedProcessHistoryRows[i], this.time[i]);
            }
            this.producer.flush();
            ProcessHistoryRequest.this.waitForOffsets(kafkaChunkedSendResultArr);
            return kafkaChunkedSendResultArr;
        }

        @Override // java.lang.Comparable
        public int compareTo(Object obj) {
            if (obj instanceof WriteRequest) {
                return Long.compare(this.priority, ((WriteRequest) obj).priority);
            }
            return -1;
        }

        public boolean equals(Object obj) {
            return (obj instanceof WriteRequest) && this.priority == ((WriteRequest) obj).priority;
        }

        public int hashCode() {
            return (int) this.priority;
        }
    }

    public void merge(ProcessHistoryRequest processHistoryRequest) {
        if (processHistoryRequest == null || processHistoryRequest.getProcessHistoryRows() == null) {
            return;
        }
        if (this.processHistoryRows == null) {
            this.topic = processHistoryRequest.getTopic();
            this.processHistoryRows = processHistoryRequest.getProcessHistoryRows();
        } else {
            if (!Objects.equals(this.topic, processHistoryRequest.getTopic())) {
                throw new RuntimeException("topic must be the same to merge");
            }
            this.processHistoryRows = (ProcessHistoryRow[]) PortableArrayUtils.appendAllInto(this.processHistoryRows, processHistoryRequest.getProcessHistoryRows(), new ProcessHistoryRow[this.processHistoryRows.length + processHistoryRequest.getProcessHistoryRows().length]);
        }
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.originalTopic = str;
        this.topic = KafkaTopicPrefixes.PROCESS_HISTORY.transform(str);
    }

    public Integer getPartition() {
        return this.partition;
    }

    public void setPartition(Integer num) {
        this.partition = num;
    }

    protected double getWriteToKafkaErrorCounter(String str) {
        return ((Counter.Child) writeToKafkaErrorCounter.labels(new String[]{str})).get();
    }

    public String getOriginalTopic() {
        return this.originalTopic;
    }

    public int getMaxCountOfChunks() {
        return this.maxCountOfChunks;
    }

    private boolean isInitializeTopic() {
        return this.initializeTopic;
    }

    public void setInitializeTopic(boolean z) {
        this.initializeTopic = z;
    }

    public boolean isDuplicate() {
        return this.duplicate;
    }

    public void setDuplicate(boolean z) {
        this.duplicate = z;
    }

    public ProcessHistoryRow[] getProcessHistoryRows() {
        return this.processHistoryRows;
    }

    public void setProcessHistoryRows(ProcessHistoryRow[] processHistoryRowArr) {
        this.processHistoryRows = processHistoryRowArr;
    }

    private static KafkaExecutionConfiguration getKafkaExecutionConfiguration() {
        return (KafkaExecutionConfiguration) ConfigurationFactory.getConfiguration(KafkaExecutionConfiguration.class);
    }

    protected CreatedTopic initTopicIfNecessary() throws ExecutionException, InterruptedException {
        if (!this.initializeTopic) {
            return null;
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("initialize topic [" + this.topic + "] requested");
        }
        return AppianKafkaAdminClientFactory.createTopic(this.topic);
    }

    protected long determineMinOffset(RecordMetadata[] recordMetadataArr) {
        long j = Long.MAX_VALUE;
        try {
            j = KafkaChunkedSendResult.getMinimumOffset(recordMetadataArr);
            if (j >= 0 && j < this.minOffset) {
                this.minOffset = j;
                this.setOffsets = true;
            }
        } catch (Throwable th) {
            incrementWriteToKafkaErrorCounter(KAFKA_OFFSET_ERROR_STRING);
            LOG.error("Error setting minimum offsets for process history data written to Kafka", th);
        }
        return j;
    }

    protected long determineMaxOffset(RecordMetadata[] recordMetadataArr) {
        long j = Long.MIN_VALUE;
        try {
            j = KafkaChunkedSendResult.getMaximumOffset(recordMetadataArr);
            if (j > this.maxOffset) {
                this.maxOffset = j;
                this.setOffsets = true;
            }
        } catch (Throwable th) {
            incrementWriteToKafkaErrorCounter(KAFKA_OFFSET_ERROR_STRING);
            LOG.error("Error setting maximum offsets for process history data written to Kafka", th);
        }
        return j;
    }

    protected void waitForOffsets(KafkaChunkedSendResult kafkaChunkedSendResult) throws ExecutionException, InterruptedException {
        if (kafkaChunkedSendResult == null) {
            return;
        }
        try {
            RecordMetadata[] recordMetadataArr = kafkaChunkedSendResult.get();
            if (determineMinOffset(recordMetadataArr) > determineMaxOffset(recordMetadataArr)) {
                try {
                    incrementWriteToKafkaErrorCounter(KAFKA_OFFSET_ERROR_STRING);
                    LOG.error("Could not determine offset when writing process history to Kafka for process [" + this.processId + "] on topic [" + this.topic + "] partition [" + this.partition + "]");
                } catch (Throwable th) {
                    LOG.error("Error recording KAFKA_OFFSET_ERROR_STRING metrics", th);
                }
            }
        } catch (InterruptedException | RuntimeException | ExecutionException e) {
            incrementWriteToKafkaErrorCounter(KAFKA_OFFSET_ERROR_STRING);
            LOG.error("Could not determine offset when examining Kafka chunked send result for process [" + this.processId + "] on topic [" + this.topic + "] partition [" + this.partition + "]");
            throw e;
        }
    }

    protected void waitForOffsets(KafkaChunkedSendResult[] kafkaChunkedSendResultArr) throws ExecutionException, InterruptedException {
        if (kafkaChunkedSendResultArr == null) {
            return;
        }
        for (KafkaChunkedSendResult kafkaChunkedSendResult : kafkaChunkedSendResultArr) {
            waitForOffsets(kafkaChunkedSendResult);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaChunkedSendResult writeProcessHistoryRowToKafka(ProcessHistoryKafkaProducer processHistoryKafkaProducer, KafkaTransactionId kafkaTransactionId, byte[] bArr, long j) throws ExecutionException, InterruptedException {
        if (bArr == null) {
            return null;
        }
        KafkaChunkedSendResult sendAsynchronous = processHistoryKafkaProducer.sendAsynchronous(this.topic, this.partition.intValue(), kafkaTransactionId, bArr, j);
        this.maxCountOfChunks = Math.max(this.maxCountOfChunks, sendAsynchronous.getCountOfChunks());
        return sendAsynchronous;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v13, types: [byte[], byte[][]] */
    public void writeAllToKafka() throws ExecutionException, InterruptedException {
        if (this.processHistoryRows == null || this.processHistoryRows.length == 0) {
            MigrateProcessHistoryDryRunResponse.logTotalsRequest(this.originalTopic);
            return;
        }
        ProcessHistoryKafkaProducer processHistoryKafkaProducer = ProcessHistoryKafkaProducer.getInstance();
        int length = this.processHistoryRows.length;
        if (length == 0) {
            return;
        }
        ?? r0 = new byte[length];
        int i = 0;
        int orderOfProcessHistoryRows = orderOfProcessHistoryRows(this.processHistoryRows);
        KafkaTransactionId[] kafkaTransactionIdArr = new KafkaTransactionId[length];
        long[] jArr = new long[length];
        for (int i2 = 0; i2 < length; i2++) {
            ProcessHistoryRow processHistoryRow = this.processHistoryRows[i2];
            kafkaTransactionIdArr[i2] = processHistoryRow.kafkaTransactionId();
            jArr[i2] = processHistoryRow.time();
        }
        if (this.processId == null) {
            this.processId = this.processHistoryRows[0].getProcessId();
        }
        for (int i3 = 0; i3 < length; i3++) {
            try {
                r0[i3] = processHistoryKafkaProducer.prepareJsonBytes(this.processHistoryRows[i3]);
                this.processHistoryRows[i3] = null;
                i++;
            } catch (Exception e) {
            }
        }
        if (i == 0) {
            LOG.debug("writeAllToKafka has no rows to write after JSON preparation, so skipping");
        } else {
            writePreparedDataToKafka(processHistoryKafkaProducer, orderOfProcessHistoryRows, kafkaTransactionIdArr, jArr, r0);
            LOG.debug("writeAllToKafka completed");
        }
    }

    private static int orderOfProcessHistoryRows(ProcessHistoryRow[] processHistoryRowArr) {
        Integer order = processHistoryRowArr[0].getOrder();
        return order != null ? order.intValue() : TypedVariable.MAX_TYPE;
    }

    private void writePreparedDataToKafka(ProcessHistoryKafkaProducer processHistoryKafkaProducer, int i, KafkaTransactionId[] kafkaTransactionIdArr, long[] jArr, byte[][] bArr) throws ExecutionException, InterruptedException {
        long j;
        int i2 = 0;
        long j2 = RETRY_WRITE_MS;
        while (true) {
            try {
                j = j2;
                WriteRequest writeRequest = new WriteRequest(processHistoryKafkaProducer, i, kafkaTransactionIdArr, jArr, bArr);
                executorServive.execute(writeRequest);
                writeRequest.getFuture().get();
                return;
            } catch (InterruptedException | RuntimeException | ExecutionException e) {
                incrementWriteToKafkaErrorCounter(KAFKA_SEND_ERROR_STRING);
                i2++;
                if (i2 >= 10) {
                    LOG.error("Could not send ProcessHistoryRequest to Kafka (retryCount [" + i2 + "] from Java), will retry later from work queue", e);
                    throw e;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Could not send ProcessHistoryRequest to Kafka (retryCount [" + i2 + "] from Java), will retry after [" + RETRY_WRITE_MS + "] ms", e);
                }
                sleep(j);
                j2 = (long) (j * 1.5d);
            }
        }
    }

    protected void incrementWriteToKafkaErrorCounter(String str) {
        ((Counter.Child) writeToKafkaErrorCounter.labels(new String[]{str})).inc();
    }

    private static void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    protected ProcessHistoryResponse response() {
        return this.setOffsets ? new ProcessHistoryResponse(this, this.minOffset, this.maxOffset, this.maxCountOfChunks) : new ProcessHistoryResponse(this, this.maxCountOfChunks);
    }

    @Override // com.appiancorp.process.engine.ProcessActionRequest
    public int getType() {
        return 140;
    }

    @Override // com.appiancorp.process.engine.ContinuationRequest, com.appiancorp.process.engine.UnattendedRequest
    public String getRunAsUsername() {
        return "Administrator";
    }

    @Override // com.appiancorp.process.engine.ContinuationRequest
    public ProcessHistoryResponse execute0() {
        try {
            this.partition = Integer.valueOf(assignPartition(initTopicIfNecessary()));
            writeAllToKafka();
            return response();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private int assignPartition(CreatedTopic createdTopic) throws ExecutionException, InterruptedException {
        Integer[] numArr;
        if (this.partition != null) {
            return this.partition.intValue();
        }
        if (createdTopic != null) {
            int partitionCount = createdTopic.getPartitionCount();
            if (LOG.isInfoEnabled()) {
                LOG.info("Assigning partition from INITIAL creation of [" + partitionCount + "] partitions");
            }
            numArr = new Integer[partitionCount];
            for (int i = 0; i < partitionCount; i++) {
                numArr[i] = Integer.valueOf(i);
            }
        } else {
            numArr = (Integer[]) determinePartitionsFromKafka(this.topic).toArray(new Integer[0]);
            if (LOG.isInfoEnabled()) {
                LOG.info("Assigning partition from initial creation of EXISTING [" + numArr.length + "] partitions");
            }
        }
        Long processId = getProcessId();
        int length = numArr.length;
        if (length == 0) {
            this.partition = 0;
        } else if (processId == null) {
            this.partition = numArr[mod(topicPartitionAssignment.incrementAndGet(), length)];
        } else {
            this.partition = numArr[mod(processId.intValue(), length)];
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Assigning partition [" + this.partition + "]");
        }
        return this.partition.intValue();
    }

    private static int mod(int i, int i2) {
        if (i2 <= 1) {
            return 0;
        }
        int i3 = i % i2;
        return i3 < 0 ? i3 + i2 : i3;
    }

    private Set<Integer> determinePartitionsFromKafka(String str) throws ExecutionException, InterruptedException {
        TopicPartitionTracker topicPartitionTracker = topicPartitionTrackerMap.get(str);
        if (topicPartitionTracker != null && !topicPartitionTracker.expired()) {
            return topicPartitionTracker.getPartitions();
        }
        Set<Integer> determinePartitions = AppianKafkaTopic.determinePartitions(ProcessHistoryKafkaProducer.getInstance().getKafkaChunkedProducer().getKafkaProducer(), str);
        if (determinePartitions.isEmpty()) {
            return new HashSet();
        }
        TopicPartitionTracker topicPartitionTracker2 = new TopicPartitionTracker(determinePartitions);
        topicPartitionTrackerMap.put(str, topicPartitionTracker2);
        return topicPartitionTracker2.getPartitions();
    }

    public Long getProcessId() {
        if (this.processId != null) {
            return this.processId;
        }
        ProcessHistoryRow[] processHistoryRowArr = this.processHistoryRows;
        if (processHistoryRowArr == null || processHistoryRowArr.length <= 0 || processHistoryRowArr[0] == null) {
            return null;
        }
        return processHistoryRowArr[0].getProcessId();
    }

    public void setProcessId(Long l) {
        this.processId = l;
    }

    public boolean isDryRun() {
        return false;
    }

    @Override // com.appiancorp.process.engine.ContinuationRequest
    public String toDebugString() {
        StringBuilder sb = new StringBuilder("ProcessHistoryRequest{");
        sb.append("topic='").append(this.topic).append('\'');
        sb.append(", processHistoryDiffs=").append(Arrays.toString(this.processHistoryRows));
        sb.append('}');
        return sb.toString();
    }

    public int findIdIndex(Id id) {
        return findIdIndex(id, 0);
    }

    public int findIdIndex(Id id, int i) {
        Id[] idArr = (Id[]) Arrays.stream(this.processHistoryRows).map((v0) -> {
            return v0.getId();
        }).toArray(i2 -> {
            return new Id[i2];
        });
        int length = idArr.length;
        for (int i3 = i; i3 < length; i3++) {
            if (id.equals(idArr[i3])) {
                return i3;
            }
        }
        return -1;
    }

    static /* synthetic */ KafkaExecutionConfiguration access$100() {
        return getKafkaExecutionConfiguration();
    }
}
