package com.appiancorp.common;

import com.appiancorp.monitoring.prometheus.MonitoredScheduledThreadPoolExecutor;
import com.appiancorp.monitoring.prometheus.MonitoredThreadPoolExecutor;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

@ThreadSafe
/* loaded from: input_file:com/appiancorp/common/BatchedProcessor.class */
public class BatchedProcessor<T, R> implements Closeable {
    private static final Logger LOG = Logger.getLogger(BatchedProcessor.class);
    private final String queueName;
    private final int batchSize;
    private final Function<List<T>, R> function;
    private final ExecutorService executor;
    private final ListeningExecutorService decoratedExecutor;
    private final ExecutorService callbackExecutor;
    private final BatchedProcessorQueue<T> queue;
    private final AtomicLong droppedCount;
    private final AtomicLong isDropping;
    private final String startDroppingMessage;
    private final String stopDroppingMessage;
    private final FutureCallback<R> callback;
    private final Semaphore executeRunning;
    private final int permits;
    private final AtomicBoolean closed;
    private final Optional<ListenableScheduledFuture<?>> flushFuture;
    private final Optional<ListeningScheduledExecutorService> flusher;

    /* loaded from: input_file:com/appiancorp/common/BatchedProcessor$Builder.class */
    public static class Builder<T, R> {
        private int batchSize;
        private int maxQueueSize;
        private int concurrentExecutes = 1;
        private long duration;
        private TimeUnit durationTimeUnit;
        private Function<List<T>, R> function;
        private FutureCallback<R> resultCallback;
        public String queueName;
        public String typeName;

        public Builder(String str, String str2, Function<List<T>, R> function) {
            this.queueName = (String) Preconditions.checkNotNull(str);
            this.typeName = (String) Preconditions.checkNotNull(str2);
            this.function = (Function) Preconditions.checkNotNull(function);
        }

        public Builder<T, R> queueSizes(int i, int i2) {
            Preconditions.checkArgument(i2 >= i, "max queue size must be >= batch size.");
            this.batchSize = i;
            this.maxQueueSize = i2;
            return this;
        }

        public Builder<T, R> concurrentExecutes(int i) {
            Preconditions.checkArgument(i > 0, "concurrentExecutes must be greater than zero");
            this.concurrentExecutes = i;
            return this;
        }

        public Builder<T, R> processWithFixedDelay(long j, TimeUnit timeUnit) {
            this.duration = j;
            this.durationTimeUnit = (TimeUnit) Preconditions.checkNotNull(timeUnit);
            return this;
        }

        public Builder<T, R> onResults(FutureCallback<R> futureCallback) {
            this.resultCallback = (FutureCallback) Preconditions.checkNotNull(futureCallback);
            return this;
        }

        public BatchedProcessor<T, R> build() {
            return new BatchedProcessor<>(this);
        }
    }

    /* loaded from: input_file:com/appiancorp/common/BatchedProcessor$LoggingFutureCallback.class */
    private class LoggingFutureCallback implements FutureCallback<R> {
        private LoggingFutureCallback() {
        }

        public void onSuccess(@Nullable R r) {
        }

        public void onFailure(Throwable th) {
            BatchedProcessor.this.log(Level.ERROR, "failure", th);
        }
    }

    public Builder<T, R> builder(String str, String str2, Function<List<T>, R> function) {
        return new Builder<>(str, str2, function);
    }

    private BatchedProcessor(Builder<T, R> builder) {
        this.queueName = builder.queueName;
        String str = "Appian " + getClass().getSimpleName() + " " + this.queueName + " - ";
        this.closed = new AtomicBoolean();
        this.batchSize = ((Builder) builder).batchSize;
        this.queue = new BatchedProcessorQueue<>(((Builder) builder).maxQueueSize);
        this.droppedCount = new AtomicLong(0L);
        this.isDropping = new AtomicLong(0L);
        this.startDroppingMessage = builder.queueName + " queue is at max size [" + ((Builder) builder).maxQueueSize + "]. Additional " + builder.typeName + " items will be dropped until the queue size decreases. Another message will be logged when the queue regains the capacity to hold new items.";
        this.stopDroppingMessage = builder.queueName + " queue recovered after dropping [%d] items. New " + builder.typeName + " items are now being added to the queue.";
        this.function = (Function) Preconditions.checkNotNull(((Builder) builder).function);
        this.executor = new MonitoredThreadPoolExecutor(1, ((Builder) builder).concurrentExecutes, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(str + "executor-%d").build(), "BatchedProcessorExecutor");
        this.decoratedExecutor = MoreExecutors.listeningDecorator(this.executor);
        this.callback = ((Builder) builder).resultCallback != null ? ((Builder) builder).resultCallback : new LoggingFutureCallback();
        this.callbackExecutor = new MonitoredThreadPoolExecutor(1, ((Builder) builder).concurrentExecutes, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(str + "callback-%d").build(), "BatchedProcessorCallbackExecutor");
        this.permits = ((Builder) builder).concurrentExecutes;
        this.executeRunning = new Semaphore(this.permits);
        if (!(((Builder) builder).durationTimeUnit != null)) {
            this.flusher = Optional.absent();
            this.flushFuture = Optional.absent();
        } else {
            this.flusher = Optional.of(MoreExecutors.listeningDecorator(MonitoredScheduledThreadPoolExecutor.newMonitoredScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(str + "flusher-%d").build(), "BatchedProcessorFlusher")));
            long millis = ((Builder) builder).durationTimeUnit.toMillis(((Builder) builder).duration);
            this.flushFuture = Optional.of(((ListeningScheduledExecutorService) this.flusher.get()).scheduleWithFixedDelay(new Runnable() { // from class: com.appiancorp.common.BatchedProcessor.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        BatchedProcessor.this.executeCurrent();
                    } catch (Throwable th) {
                        BatchedProcessor.this.log(Level.ERROR, "Error in flusher", th);
                    }
                }
            }, millis, millis, TimeUnit.MILLISECONDS));
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("BatchedProcessor{").append(this.queueName).append(": queue=").append(this.queue).append(", batchSize=").append(this.batchSize).append(", droppedCount=").append(this.droppedCount).append(", isDropping=").append(this.isDropping).append(", closed=").append(this.closed).append("}");
        return sb.toString();
    }

    public long droppedCount() {
        return this.droppedCount.get();
    }

    public int queueSize() {
        return this.queue.totalSize();
    }

    @SafeVarargs
    public final void add(T... tArr) {
        add((List) Arrays.asList(tArr));
    }

    public final void add(Iterable<T> iterable) {
        Iterator<T> it = Iterables.partition(iterable, this.batchSize).iterator();
        while (it.hasNext()) {
            add((List) it.next());
        }
    }

    private void add(List<T> list) {
        ensureOpen();
        if (!this.queue.add(list)) {
            drop(list.size());
        } else {
            stoppedDrop();
            scheduleExecuteIfNeeded();
        }
    }

    private void drop(int i) {
        if (this.isDropping.compareAndSet(0L, droppedCount()) && LOG.isInfoEnabled()) {
            LOG.info(this.startDroppingMessage);
        }
        this.droppedCount.getAndAdd(i);
    }

    private void stoppedDrop() {
        long j = this.isDropping.get();
        if (j > 0) {
            long droppedCount = droppedCount() - j;
            if (LOG.isInfoEnabled()) {
                LOG.info(String.format(this.stopDroppingMessage, Long.valueOf(droppedCount)));
            }
            this.isDropping.set(0L);
        }
    }

    private void scheduleExecuteIfNeeded() {
        if (isAtOrAboveBatchSize()) {
            tryScheduleExecuteUntilUnderLimit();
        }
    }

    private void tryScheduleExecuteUntilUnderLimit() {
        try {
            if (this.executeRunning.tryAcquire(0L, TimeUnit.MILLISECONDS)) {
                this.executor.submit(new Runnable() { // from class: com.appiancorp.common.BatchedProcessor.2
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            BatchedProcessor.this.executeUntilUnderLimit();
                        } catch (Throwable th) {
                            BatchedProcessor.this.log(Level.ERROR, "Error submitting tasks for processing items", th);
                        } finally {
                            BatchedProcessor.this.executeRunning.release();
                        }
                    }
                });
            }
        } catch (InterruptedException e) {
            throw Throwables.propagate(e);
        }
    }

    private boolean isAtOrAboveBatchSize() {
        return this.queue.accumulatedSize() >= this.batchSize;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void executeUntilUnderLimit() {
        while (isAtOrAboveBatchSize()) {
            doExecute();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void executeCurrent() {
        int accumulatedSize = this.queue.accumulatedSize();
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= accumulatedSize || this.queue.accumulatedSize() <= 0) {
                return;
            } else {
                i = i2 + doExecute();
            }
        }
    }

    private int doExecute() {
        final ImmutableList<T> removeForProcessing = this.queue.removeForProcessing(this.batchSize);
        if (removeForProcessing.isEmpty()) {
            return 0;
        }
        final int size = removeForProcessing.size();
        if (LOG.isTraceEnabled()) {
            log(Level.TRACE, this.queue + " Submitting " + size + " items for processing...");
        }
        Futures.addCallback(this.decoratedExecutor.submit(new Callable<R>() { // from class: com.appiancorp.common.BatchedProcessor.3
            @Override // java.util.concurrent.Callable
            public R call() throws Exception {
                try {
                    R r = (R) BatchedProcessor.this.function.apply(removeForProcessing);
                    if (BatchedProcessor.LOG.isTraceEnabled()) {
                        BatchedProcessor.this.log(Level.TRACE, BatchedProcessor.this.queue + " Processed " + size + " items...");
                    }
                    BatchedProcessor.this.queue.notifyProcessed(size);
                    return r;
                } catch (Throwable th) {
                    if (BatchedProcessor.LOG.isTraceEnabled()) {
                        BatchedProcessor.this.log(Level.TRACE, BatchedProcessor.this.queue + " Processed " + size + " items...");
                    }
                    BatchedProcessor.this.queue.notifyProcessed(size);
                    throw th;
                }
            }
        }), this.callback, this.callbackExecutor);
        logExecutors();
        return size;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.closed.get()) {
            return;
        }
        this.closed.set(true);
        if (this.flushFuture.isPresent()) {
            ((ListenableScheduledFuture) this.flushFuture.get()).cancel(false);
        }
        if (this.flusher.isPresent()) {
            ((ListeningScheduledExecutorService) this.flusher.get()).shutdown();
        }
        this.executeRunning.acquireUninterruptibly(this.permits);
        while (this.queue.accumulatedSize() > 0) {
            doExecute();
        }
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(1L, TimeUnit.MINUTES);
            this.callbackExecutor.shutdown();
            this.callbackExecutor.awaitTermination(1L, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            log(Level.DEBUG, "Interrupted during shutdown", e);
        }
    }

    private void ensureOpen() {
        if (this.closed.get()) {
            throw new IllegalStateException(this.queueName + " queue is closed");
        }
    }

    private void logExecutors() {
        if (LOG.isTraceEnabled()) {
            log(Level.TRACE, "executor: " + this.executor);
            log(Level.TRACE, "callbackExecutor: " + this.callbackExecutor);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void log(Level level, String str) {
        LOG.log(level, "[" + this.queueName + "] " + str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void log(Level level, String str, Throwable th) {
        LOG.log(level, "[" + this.queueName + "] " + str, th);
    }
}
