package com.appiancorp.common.initialize;

import com.appiancorp.common.config.ConfigObject;
import com.appiancorp.process.common.service.MultipleProcessServersServiceFactory;
import com.appiancorp.process.execution.service.ExtendedProcessExecutionService;
import com.appiancorp.process.execution.service.ProcessHistoryKafkaPurger;
import com.appiancorp.suite.cfg.ConfigurationFactory;
import com.appiancorp.suite.cfg.FeatureToggleConfiguration;
import com.appiancorp.suiteapi.common.ServiceLocator;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/appiancorp/common/initialize/MigrateProcessHistoryStrayProcesses.class */
public class MigrateProcessHistoryStrayProcesses extends ConfigObject {
    private static final Logger LOG = Logger.getLogger(MigrateProcessHistoryStrayProcesses.class);
    private static final long TIMEOUT_MS = 10000;
    static final String MIGRATION_FLAG_NAME = "ProcessHistoryStrayProcesses";
    public static final int PROCESS_HISTORY_STRAY_PROCESSES = 2;

    public void finish() throws Exception {
        final DefaultMigrationFlag defaultMigrationFlag = new DefaultMigrationFlag(MIGRATION_FLAG_NAME);
        if (defaultMigrationFlag.hasMigrationOccurred(2)) {
            return;
        }
        if (((FeatureToggleConfiguration) ConfigurationFactory.getConfiguration(FeatureToggleConfiguration.class)).isProcessHistoryInKafkaEnabled()) {
            LOG.info("Starting ProcessHistoryStrayProcesses migration");
            new Thread(new Runnable() { // from class: com.appiancorp.common.initialize.MigrateProcessHistoryStrayProcesses.1
                @Override // java.lang.Runnable
                public void run() {
                    ProcessHistoryKafkaPurger processHistoryKafkaPurger = new ProcessHistoryKafkaPurger(10000L, (ExtendedProcessExecutionService) ServiceLocator.getService(ExtendedProcessExecutionService.SERVICE_NAME));
                    int numServers = MultipleProcessServersServiceFactory.getExtendedProcessExecutionService().getNumServers();
                    for (int i = 0; i < numServers; i++) {
                        try {
                            MigrateProcessHistoryStrayProcesses.LOG.info("On shard [" + i + "] tombstoned [" + processHistoryKafkaPurger.purgeStray(i) + "] stray kafka keys");
                        } catch (Exception e) {
                            MigrateProcessHistoryStrayProcesses.LOG.error("Failed ProcessHistoryStrayProcesses migration", e);
                            return;
                        }
                    }
                    defaultMigrationFlag.setMigrationOccurred(2);
                    MigrateProcessHistoryStrayProcesses.LOG.info("Successfully completed ProcessHistoryStrayProcesses migration");
                }
            }).start();
        } else {
            LOG.info("Skipping ProcessHistoryStrayProcesses migration since process history in Kafka is not enabled");
            defaultMigrationFlag.setMigrationOccurred(2);
        }
    }
}
