package com.appiancorp.process.execution.service;

import com.appiancorp.process.history.ProcessHistoryKafkaTransactionId;
import com.appiancorp.process.kafka.AppianKafkaConsumerFactory;
import com.appiancorp.process.kafka.AppianKafkaTopic;
import com.appiancorp.process.kafka.KafkaSingleChunk;
import com.appiancorp.process.kafka.KafkaSingleChunkLowLevelConsumer;
import com.appiancorp.process.kafka.KafkaTransactionId;
import com.appiancorp.suiteapi.common.exceptions.InvalidProcessModelException;
import com.appiancorp.suiteapi.common.exceptions.PrivilegeException;
import com.appiancorp.suiteapi.process.TypedVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/appiancorp/process/execution/service/ProcessHistoryKafkaScanner.class */
public class ProcessHistoryKafkaScanner {
    private static final int DELETED_PROCESSES_SORT_ORDER = 0;
    private final int processHistoryLocationBatchSize;
    private final long timeoutMs;
    private final ExtendedProcessExecutionService extendedProcessExecutionService;
    protected static final Logger LOG = Logger.getLogger(ProcessHistoryKafkaScanner.class);
    private static long MAX_OFFSET_TO_READ = Long.MAX_VALUE;
    private static long DEFAULT_MIN_OFFSET_TO_READ = 0;

    public ProcessHistoryKafkaScanner(int i, long j, ExtendedProcessExecutionService extendedProcessExecutionService) {
        this.processHistoryLocationBatchSize = i;
        this.timeoutMs = j;
        this.extendedProcessExecutionService = extendedProcessExecutionService;
    }

    public void recoverOffsets(int i) throws Exception {
        recoverOffsets(i, DEFAULT_MIN_OFFSET_TO_READ);
    }

    public void recoverOffsets(int i, long j) throws Exception {
        String currentTopicVersionForServer = this.extendedProcessExecutionService.getCurrentTopicVersionForServer(i);
        Map<Long, ProcessHistoryLocation> prepareProcessHistoryMap = prepareProcessHistoryMap(i, currentTopicVersionForServer);
        long j2 = j;
        for (Integer num : AppianKafkaTopic.determinePartitions(currentTopicVersionForServer)) {
            HashSet hashSet = new HashSet();
            int intValue = num.intValue();
            try {
                KafkaConsumer kafkaConsumer = AppianKafkaConsumerFactory.getKafkaConsumer();
                Throwable th = null;
                try {
                    try {
                        KafkaSingleChunkLowLevelConsumer of = KafkaSingleChunkLowLevelConsumer.of(kafkaConsumer, this.timeoutMs, currentTopicVersionForServer, intValue, j, MAX_OFFSET_TO_READ);
                        Throwable th2 = null;
                        try {
                            try {
                                Iterator<KafkaSingleChunk> it = of.iterator();
                                while (it.hasNext()) {
                                    KafkaSingleChunk next = it.next();
                                    j2 = of.getOffset();
                                    KafkaTransactionId kafkaTransactionId = next.getKafkaTransactionId();
                                    if (kafkaTransactionId instanceof ProcessHistoryKafkaTransactionId) {
                                        long processId = ((ProcessHistoryKafkaTransactionId) kafkaTransactionId).getProcessId();
                                        ProcessHistoryLocation processHistoryLocation = prepareProcessHistoryMap.get(Long.valueOf(processId));
                                        if (processHistoryLocation != null) {
                                            if (!hashSet.contains(Long.valueOf(processId))) {
                                                processHistoryLocation.setMinOffsetAsLong(j2);
                                            }
                                            processHistoryLocation.setMaxOffsetAsLong(j2);
                                            hashSet.add(Long.valueOf(processId));
                                        }
                                    }
                                }
                                flushProcessHistoryLocationsToK(i, prepareProcessHistoryMap, hashSet);
                                LOG.info("Recovered offsets for " + prepareProcessHistoryMap.size() + " processes on topic [" + currentTopicVersionForServer + "] partition [" + intValue + "]");
                                if (of != null) {
                                    if (0 != 0) {
                                        try {
                                            of.close();
                                        } catch (Throwable th3) {
                                            th2.addSuppressed(th3);
                                        }
                                    } else {
                                        of.close();
                                    }
                                }
                                if (kafkaConsumer != null) {
                                    if (0 != 0) {
                                        try {
                                            kafkaConsumer.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    } else {
                                        kafkaConsumer.close();
                                    }
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Error e) {
                LOG.error("Error occurred at offset [" + j2 + "]", e);
                throw e;
            } catch (Exception e2) {
                LOG.error("Exception occurred at offset [" + j2 + "]", e2);
                throw e2;
            }
        }
    }

    private Map<Long, ProcessHistoryLocation> prepareProcessHistoryMap(int i, String str) {
        return (Map) getAllNonPurgedProcessHistoryLocations(i, str).stream().collect(Collectors.toMap((v0) -> {
            return v0.getProcessId();
        }, Function.identity()));
    }

    private List<ProcessHistoryLocation> getAllNonPurgedProcessHistoryLocations(int i, String str) {
        ArrayList arrayList = new ArrayList(Arrays.asList(this.extendedProcessExecutionService.locateProcessAuditHistory(this.extendedProcessExecutionService.getAllProcessIdsForServer(i))));
        try {
            for (ProcessHistoryLocation processHistoryLocation : getAllDeletedProcessHistoryLocations()) {
                if (str.equals(processHistoryLocation.getTopic())) {
                    arrayList.add(processHistoryLocation);
                }
            }
        } catch (PrivilegeException e) {
            LOG.error("PrivilegeException! This should not happen " + e.getMessage());
        } catch (InvalidProcessModelException e2) {
            LOG.error("Invalid Process Model! Ignoring invalid deleted processes and moving on");
        }
        return arrayList;
    }

    private ProcessHistoryLocation[] getAllDeletedProcessHistoryLocations() throws PrivilegeException, InvalidProcessModelException {
        return (ProcessHistoryLocation[]) this.extendedProcessExecutionService.browseDeletedProcessesPaging(0, TypedVariable.MAX_TYPE, 0, 0).getResults();
    }

    private void flushProcessHistoryLocationsToK(int i, Map<Long, ProcessHistoryLocation> map, Set<Long> set) {
        List list = (List) map.values().stream().filter(processHistoryLocation -> {
            return set.contains(processHistoryLocation.getProcessId());
        }).collect(Collectors.toList());
        int i2 = 0;
        while (true) {
            int i3 = i2;
            if (i3 > list.size()) {
                return;
            }
            this.extendedProcessExecutionService.updateOffsetsForProcessHistoryLocations((ProcessHistoryLocation[]) list.subList(i3, Math.min(i3 + this.processHistoryLocationBatchSize, list.size())).toArray(new ProcessHistoryLocation[0]));
            i2 = i3 + this.processHistoryLocationBatchSize;
        }
    }
}
