package com.appiancorp.process.execution.service;

import com.appiancorp.dataexport.ExcelDocumentCreator;
import com.appiancorp.process.common.service.MultipleProcessServersServiceFactory;
import com.appiancorp.process.kafka.AppianKafkaAdminClientFactory;
import com.appiancorp.process.kafka.AppianKafkaTopic;
import com.appiancorp.security.auth.SpringSecurityContextHelper;
import com.appiancorp.suiteapi.common.ServiceLocator;
import com.appiancorp.suiteapi.common.exceptions.InvalidProcessModelException;
import com.appiancorp.suiteapi.common.exceptions.PrivilegeException;
import com.appiancorp.suiteapi.process.TypedVariable;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Observer;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/appiancorp/process/execution/service/ProcessHistoryKafkaCLICompactor.class */
public class ProcessHistoryKafkaCLICompactor {
    private static final String OPTION_UNLOCK_KAFKA = "u";
    private static final String OPTION_BATCH_SIZE = "b";
    private static final int DELETED_PROCESSES_SORT_ORDER = 0;
    private static final int DEFAULT_TIMEOUT = 10000;
    private static final int DEFAULT_BATCH_SIZE = 10000;
    private static final int DEFAULT_MAX_RECORDS_IN_MEMORY = 2000000;
    protected static final String MAX_RECORDS_IN_MEMORY_CONFIG = "max-record";
    protected static final String KAFKA_CONSUMER_TIMEOUT_CONFIG = "consumer-timeout";
    private final int shardId;
    private final String incrementedTopic;
    private final String sourceTopic;
    private final BufferedOutputStream out;
    private final TopicRewriter topicRewriter;
    private final ExtendedProcessExecutionService extendedProcessExecutionService;
    protected static final Logger LOG = Logger.getLogger(ProcessHistoryKafkaCLICompactor.class);
    private static final String OPTION_HELP = "h";
    private static final String OPTION_SID = "sid";
    private static final String OPTION_KAFKA_CONSUMER_TIMEOUT = "rt";
    private static final String OPTION_MAX_RECORDS_IN_MEMORY = "max";
    protected static final String PROCESS_HISTORY_LOCATION_BATCH_SIZE_CONFIG = "batch-size";
    private static final String OPTION_KAFKA_PRODUCER_TIMEOUT = "wt";
    private static final String OPTION_KAFKA_PRODUCER_LINGER = "l";
    private static final Options options = new Options().addOption(OPTION_HELP, "help", false, "Show this message").addOption(OPTION_SID, "serverId", true, "The shard id which will be used to grab the topic version").addOption("u", "unlock", false, "Releases the Kafka lock on the shard").addOption(OPTION_KAFKA_CONSUMER_TIMEOUT, "consumer timeout", true, "Kafka consumer time out - Default is 10000ms").addOption(OPTION_MAX_RECORDS_IN_MEMORY, "max-records", true, "Maximum number of records to keep in memory when reading from Kafka - Default is 2000000").addOption("b", PROCESS_HISTORY_LOCATION_BATCH_SIZE_CONFIG, true, "Maximum number of process history locations to be flushed to K 10000").addOption(OPTION_KAFKA_PRODUCER_TIMEOUT, "producer-timeout", true, "Kafka producer time out - Default is 60000ms").addOption(OPTION_KAFKA_PRODUCER_LINGER, "linger", true, "Kafka producer linger time - Default is 0ms");

    public ProcessHistoryKafkaCLICompactor(PrintStream printStream, int i, Map<String, Integer> map) throws IOException, ExecutionException, InterruptedException {
        this.out = new BufferedOutputStream(printStream);
        printHeaderToStdout("Running in Appian Administrator Context...");
        SpringSecurityContextHelper.setAdminSpringSecurityContextLazy();
        this.extendedProcessExecutionService = (ExtendedProcessExecutionService) ServiceLocator.getService(ExtendedProcessExecutionService.SERVICE_NAME);
        setupEvaluationEnvironment();
        this.shardId = i;
        KafkaVersionedTopic peekIncrementedTopicVersion = this.extendedProcessExecutionService.peekIncrementedTopicVersion(this.shardId);
        this.incrementedTopic = peekIncrementedTopicVersion.getIncrementedTopic();
        this.sourceTopic = peekIncrementedTopicVersion.getCurrentTopic();
        this.topicRewriter = new TopicRewriter(map.getOrDefault(MAX_RECORDS_IN_MEMORY_CONFIG, Integer.valueOf(ExcelDocumentCreator.ROW_ACCESS_WINDOW_SIZE_TRANSLATION_SET)).intValue(), map.getOrDefault(PROCESS_HISTORY_LOCATION_BATCH_SIZE_CONFIG, Integer.valueOf(ExcelDocumentCreator.ROW_ACCESS_WINDOW_SIZE_TRANSLATION_SET)).intValue(), map.getOrDefault(KAFKA_CONSUMER_TIMEOUT_CONFIG, Integer.valueOf(ExcelDocumentCreator.ROW_ACCESS_WINDOW_SIZE_TRANSLATION_SET)).intValue(), this.extendedProcessExecutionService, buildKafkaConfigs(map), this.sourceTopic, this.incrementedTopic);
    }

    public ProcessHistoryKafkaCLICompactor(PrintStream printStream, int i, Map<String, Integer> map, Observer observer) throws IOException, ExecutionException, InterruptedException {
        this(printStream, i, map);
        if (observer != null) {
            this.topicRewriter.addObserver(observer);
        }
    }

    private Map<String, String> buildKafkaConfigs(Map<String, Integer> map) {
        Map<String, String> map2 = (Map) map.entrySet().stream().filter(entry -> {
            return !((String) entry.getKey()).equals(KAFKA_CONSUMER_TIMEOUT_CONFIG);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return String.valueOf(entry2.getValue());
        }));
        map2.remove(KAFKA_CONSUMER_TIMEOUT_CONFIG);
        map2.remove(MAX_RECORDS_IN_MEMORY_CONFIG);
        return map2;
    }

    public static void main(String[] strArr) {
        try {
            CommandLine parse = new DefaultParser().parse(options, strArr);
            parseArgumentsAndReturnServerId(parse).ifPresent(num -> {
                try {
                    ProcessHistoryKafkaCLICompactor processHistoryKafkaCLICompactor = new ProcessHistoryKafkaCLICompactor(System.out, num.intValue(), parseArgumentsAndReturnRewriterConfigs(parse));
                    if (parse.hasOption("u")) {
                        processHistoryKafkaCLICompactor.releaseKafkaTopicLock(num.intValue());
                        LOG.info("Released the Kafka topic lock for shard " + num);
                    } else {
                        processHistoryKafkaCLICompactor.run();
                    }
                } catch (Exception e) {
                    System.err.println(e.getMessage());
                }
            });
        } catch (Exception e) {
            System.err.println(e.getMessage());
        }
    }

    public void run() throws ExecutionException, InterruptedException {
        try {
            LOG.info("Started at " + new Date(System.currentTimeMillis()).toString());
            if (this.extendedProcessExecutionService.isOffsetRecoveryHappening(this.shardId)) {
                System.err.println("We are recovering shard " + this.shardId + " from a Mirror Maker backup. Try again later!");
                return;
            }
            if (!this.extendedProcessExecutionService.acquireKafkaTopicLock(this.shardId)) {
                System.err.println("Shard is currently being used by other Kafka CLI jobs");
                return;
            }
            long[] jArr = new long[AppianKafkaTopic.determinePartitions(this.sourceTopic).size()];
            Arrays.fill(jArr, 0L);
            initTopicAndCleanUp(this.incrementedTopic);
            long[] rewriteFrom = rewriteFrom(jArr);
            startWritingToBothTopicsSimultaneously();
            LOG.info("Started second pass at " + new Date(System.currentTimeMillis()).toString());
            rewriteFrom(rewriteFrom);
            LOG.info("Stop writing to both topics concurrently and increment topic");
            incrementTopicAndStopWritingToBothTopicsSimultaneously();
            LOG.info("Successfully incremented topic to " + this.incrementedTopic);
            LOG.info("Deleting " + this.sourceTopic);
            AppianKafkaAdminClientFactory.deleteTopic(this.sourceTopic);
            releaseKafkaTopicLock(this.shardId);
            LOG.info("Successfully finished rewriting " + this.sourceTopic + " on " + this.incrementedTopic + " at " + new Date(System.currentTimeMillis()).toString());
        } catch (Exception e) {
            e.printStackTrace();
            LOG.error("Deleting the increment topic. \n" + e.getMessage());
            exitGracefully();
        }
    }

    private void releaseKafkaTopicLock(int i) {
        this.extendedProcessExecutionService.releaseKafkaTopicLock(i);
    }

    protected long[] rewriteFrom(long[] jArr) throws Exception {
        this.topicRewriter.rewrite((List) getAllNonPurgedProcessHistoryLocations().stream().filter(processHistoryLocation -> {
            return !processHistoryLocation.isDuplicated();
        }).collect(Collectors.toList()), jArr);
        return Arrays.stream(this.topicRewriter.getMaximumOffsetToRead()).map(j -> {
            return j + 1;
        }).toArray();
    }

    private List<ProcessHistoryLocation> getAllNonPurgedProcessHistoryLocations() {
        ArrayList arrayList = new ArrayList(Arrays.asList(this.extendedProcessExecutionService.locateProcessAuditHistory(this.extendedProcessExecutionService.getAllProcessIdsForServer(this.shardId))));
        try {
            for (ProcessHistoryLocation processHistoryLocation : getAllDeletedProcessHistoryLocations()) {
                if (this.sourceTopic.equals(processHistoryLocation.getTopic())) {
                    arrayList.add(processHistoryLocation);
                }
            }
        } catch (InvalidProcessModelException e) {
            LOG.error("Invalid Process Model! Ignoring invalid deleted processes and moving on");
        } catch (PrivilegeException e2) {
            LOG.error("PrivilegeException! This should not happen " + e2.getMessage());
        }
        return arrayList;
    }

    private ProcessHistoryLocation[] getAllDeletedProcessHistoryLocations() throws PrivilegeException, InvalidProcessModelException {
        return (ProcessHistoryLocation[]) this.extendedProcessExecutionService.browseDeletedProcessesPaging(0, TypedVariable.MAX_TYPE, 0, 0).getResults();
    }

    protected void startWritingToBothTopicsSimultaneously() {
        this.extendedProcessExecutionService.startDuplicatingProcessHistoryRequests(this.shardId);
    }

    protected void incrementTopicAndStopWritingToBothTopicsSimultaneously() {
        this.extendedProcessExecutionService.incrementTopicVersion(this.shardId, this.incrementedTopic);
        this.extendedProcessExecutionService.stopDuplicatingProcessHistoryRequests(this.shardId);
    }

    protected String getIncrementedTopic() {
        return this.incrementedTopic;
    }

    protected String getSourceTopic() {
        return this.sourceTopic;
    }

    protected void initTopicAndCleanUp(String str) throws ExecutionException, InterruptedException {
        deleteTopicIfExistsAndCleanUp(str);
        AppianKafkaAdminClientFactory.createTopic(str);
    }

    private void deleteTopicIfExistsAndCleanUp(String str) throws ExecutionException, InterruptedException {
        if (AppianKafkaAdminClientFactory.isTopicPresent(str)) {
            AppianKafkaAdminClientFactory.deleteTopic(str);
        }
        this.extendedProcessExecutionService.cleanPreparedOffsets(this.shardId);
    }

    private void exitGracefully() throws ExecutionException, InterruptedException {
        this.extendedProcessExecutionService.stopDuplicatingProcessHistoryRequests(this.shardId);
        deleteTopicIfExistsAndCleanUp(this.incrementedTopic);
        this.extendedProcessExecutionService.releaseKafkaTopicLock(this.shardId);
    }

    public static Optional<Integer> parseArgumentsAndReturnServerId(CommandLine commandLine) {
        try {
            if (commandLine.hasOption(OPTION_HELP)) {
                printUsage();
                return Optional.empty();
            }
            if (!commandLine.hasOption(OPTION_SID)) {
                System.err.println("must specify a -sid in order to use this script");
                printUsage();
                return Optional.empty();
            }
            int parseInt = Integer.parseInt(commandLine.getOptionValue(OPTION_SID));
            int numServers = MultipleProcessServersServiceFactory.getExtendedProcessExecutionService().getNumServers() - 1;
            if (parseInt <= numServers) {
                return Optional.of(Integer.valueOf(parseInt));
            }
            System.err.println("must specify a valid -sid (between 0 and " + numServers + ") in order to use this script");
            printUsage();
            return Optional.empty();
        } catch (NumberFormatException e) {
            e.printStackTrace();
            return Optional.empty();
        }
    }

    public static Map<String, Integer> parseArgumentsAndReturnRewriterConfigs(CommandLine commandLine) {
        HashMap hashMap = new HashMap();
        try {
            if (commandLine.hasOption(OPTION_KAFKA_CONSUMER_TIMEOUT)) {
                hashMap.put(KAFKA_CONSUMER_TIMEOUT_CONFIG, Integer.valueOf(Integer.parseInt(commandLine.getOptionValue(OPTION_KAFKA_CONSUMER_TIMEOUT))));
            }
            if (commandLine.hasOption(OPTION_MAX_RECORDS_IN_MEMORY)) {
                hashMap.put(MAX_RECORDS_IN_MEMORY_CONFIG, Integer.valueOf(Integer.parseInt(commandLine.getOptionValue(OPTION_MAX_RECORDS_IN_MEMORY))));
            }
            if (commandLine.hasOption("b")) {
                hashMap.put(PROCESS_HISTORY_LOCATION_BATCH_SIZE_CONFIG, Integer.valueOf(Integer.parseInt(commandLine.getOptionValue("b"))));
            }
            if (commandLine.hasOption(OPTION_KAFKA_PRODUCER_TIMEOUT)) {
                hashMap.put(TopicRewriter.KAFKA_PRODUCER_TIMEOUT_CONFIG, Integer.valueOf(Integer.parseInt(commandLine.getOptionValue(OPTION_KAFKA_PRODUCER_TIMEOUT))));
            }
            if (commandLine.hasOption(OPTION_KAFKA_PRODUCER_LINGER)) {
                hashMap.put(TopicRewriter.KAFKA_PRODUCER_LINGER_CONFIG, Integer.valueOf(Integer.parseInt(commandLine.getOptionValue(OPTION_KAFKA_PRODUCER_LINGER))));
            }
        } catch (NumberFormatException e) {
            e.printStackTrace();
        }
        return hashMap;
    }

    private void setupEvaluationEnvironment() throws IOException {
        printHeaderToStdout("Setting up Evaluation Environment...");
        SetupMinimalWithServerThunk.setupEvaluationEnvironment();
    }

    private static void printUsage() {
        new HelpFormatter().printHelp("topicrewrite[.sh|.bat] -sid", options);
    }

    private void printHeaderToStdout(String str) throws IOException {
        String str2 = str + System.lineSeparator();
        String str3 = String.join("", Collections.nCopies(str2.length(), "-")) + System.lineSeparator();
        this.out.write(str3.getBytes(StandardCharsets.UTF_8));
        this.out.write(str2.getBytes(StandardCharsets.UTF_8));
        this.out.write(str3.getBytes(StandardCharsets.UTF_8));
        this.out.flush();
    }
}
