package com.appiancorp.cache;

import com.appiancorp.ap2.p.groupmsg.Constants;
import com.appiancorp.redisson.EventAckMessage;
import com.appiancorp.redisson.EventMessage;
import com.appiancorp.suite.SuiteConfiguration;
import com.appiancorp.suite.cfg.ConfigurationFactory;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.log4j.Logger;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;
import org.redisson.connection.ConnectionManager;

/* loaded from: input_file:com/appiancorp/cache/RedisLocalCache.class */
public class RedisLocalCache extends RedisCacheBase {
    private static final String TOPIC_SUFFIX = ":acks";
    private static final String INTERRUPT_LABEL_VALUE = "interrupt";
    static final String TIMEOUT_LABEL_VALUE = "timeout";
    private final String hostname;
    private final RTopic updateTopic;
    private final ConcurrentHashMap<UUID, CountDownLatch> acknowledgements;
    private final AtomicInteger timeoutCounter;
    private final MessageListener<EventAckMessage> acknowledgementListener;
    private final MessageListener<EventMessage> cacheUpdateListener;
    private static final Logger LOG = Logger.getLogger(RedisLocalCache.class);
    private static final double[] REDIS_LATCH_RELEASE_BUCKETS = {0.002d, 0.005d, 0.01d, 0.02d, 0.04d, 0.08d, 0.16d, 0.5d, 1.0d};
    static final Counter putLatchTimeoutCounter = Counter.build().namespace("appian").subsystem("cache_service").name("redis_put_latch_failure_count").help("Number of times a local redis cache operation didn't receive acknowledgements from all subscribers").labelNames(new String[]{"type", "reason"}).register();
    private static final Histogram putLatchElapsedTime = Histogram.build().namespace("appian").subsystem("cache_service").name("redis_put_latch_duration_seconds_bucket").help("Elapsed time for local redis cache put latch releases").buckets(REDIS_LATCH_RELEASE_BUCKETS).labelNames(new String[]{"type"}).register();

    public RedisLocalCache(Properties properties, String str) {
        super(properties);
        this.acknowledgements = new ConcurrentHashMap<>();
        this.timeoutCounter = new AtomicInteger(0);
        this.acknowledgementListener = new MessageListener<EventAckMessage>() { // from class: com.appiancorp.cache.RedisLocalCache.1
            public void onMessage(CharSequence charSequence, EventAckMessage eventAckMessage) {
                EventMessage eventMessage = eventAckMessage.getEventMessage();
                if (RedisLocalCache.this.hostname.equals(eventMessage.getHostname())) {
                    CountDownLatch countDownLatch = (CountDownLatch) RedisLocalCache.this.acknowledgements.get(eventMessage.getUuid());
                    if (countDownLatch != null) {
                        countDownLatch.countDown();
                    }
                    if (RedisLocalCache.LOG.isDebugEnabled()) {
                        RedisLocalCache.LOG.debug(RedisLocalCache.this.getName() + " acknowledgement for " + eventMessage.getUuid() + " received from host: " + eventAckMessage.getHostname() + " latch " + (countDownLatch == null ? Constants.EXPIRED : "count=" + countDownLatch.getCount()));
                    }
                }
            }
        };
        this.cacheUpdateListener = new MessageListener<EventMessage>() { // from class: com.appiancorp.cache.RedisLocalCache.2
            public void onMessage(CharSequence charSequence, EventMessage eventMessage) {
                if (RedisLocalCache.this.hostname.equals(eventMessage.getHostname())) {
                    return;
                }
                try {
                    if (eventMessage.getEvent() == null) {
                        if (RedisLocalCache.LOG.isDebugEnabled()) {
                            RedisLocalCache.LOG.debug(RedisLocalCache.this.getName() + " " + eventMessage.getUuid() + " " + eventMessage.getHostname() + " has cleared the cache...clearing local cache");
                        }
                        RedisLocalCache.this.clearLocalCache();
                    } else {
                        Object removeFromLocalCache = RedisLocalCache.this.removeFromLocalCache(eventMessage.getEvent());
                        if (RedisLocalCache.LOG.isDebugEnabled()) {
                            RedisLocalCache.LOG.debug(RedisLocalCache.this.getName() + " " + eventMessage.getUuid() + " " + eventMessage.getHostname() + " updated " + eventMessage.getEvent() + ": removing from local cache (val=" + removeFromLocalCache + ")");
                        }
                    }
                } finally {
                    RedisLocalCache.this.updateTopic.publish(new EventAckMessage(eventMessage, RedisLocalCache.this.hostname));
                }
            }
        };
        this.hostname = str;
        this.updateTopic = this.client.getTopic(this.redissonMapCache.getName() + TOPIC_SUFFIX);
        this.updateTopic.addListener(EventAckMessage.class, this.acknowledgementListener);
        this.updateTopic.addListener(EventMessage.class, this.cacheUpdateListener);
        ((Counter.Child) putLatchTimeoutCounter.labels(new String[]{getName(), INTERRUPT_LABEL_VALUE})).inc(0.0d);
        ((Counter.Child) putLatchTimeoutCounter.labels(new String[]{getName(), "timeout"})).inc(0.0d);
    }

    private synchronized void dropOldConnectionEntry() {
        LOG.error("Clearing entry2PubSubConnection cache to force an up-to-date pubsub connection for " + getName());
        ConnectionManager connectionManager = this.client.getConnectionManager();
        connectionManager.getSubscribeService().remove(connectionManager.getEntry(this.redissonMapCache.getName() + TOPIC_SUFFIX));
    }

    RLocalCachedMap<Object, Object> getLocalCachedMap() {
        return this.redissonMapCache;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clearLocalCache() {
        getLocalCachedMap().clearLocalCache();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public Object removeFromLocalCache(Serializable serializable) {
        return serializable instanceof Object[] ? Arrays.stream((Object[]) serializable).map(obj -> {
            return Boolean.valueOf(getLocalCachedMap().cachedKeySet().remove(obj));
        }).collect(Collectors.toList()) : Boolean.valueOf(getLocalCachedMap().cachedKeySet().remove(serializable));
    }

    @Override // com.appiancorp.cache.RedisCacheBase
    RMap<Object, Object> buildRedissonMap(String str, CacheAttributes cacheAttributes) {
        return this.client.getLocalCachedMap(str, LocalCachedMapOptions.defaults().cacheSize(this.maxCacheSize).evictionPolicy(LocalCachedMapOptions.EvictionPolicy.LRU).storeMode(LocalCachedMapOptions.StoreMode.LOCALCACHE_REDIS).storeCacheMiss(true).syncStrategy(LocalCachedMapOptions.SyncStrategy.NONE).reconnectionStrategy(LocalCachedMapOptions.ReconnectionStrategy.CLEAR));
    }

    private void distributedInvalidation(Serializable serializable) {
        long countSubscribers = this.updateTopic.countSubscribers() - 1;
        if (countSubscribers == 0) {
            return;
        }
        if (countSubscribers < 0) {
            LOG.error(getName() + " has lost all connections, 0 subscribers, reconnecting");
            dropOldConnectionEntry();
            countSubscribers = this.updateTopic.countSubscribers() - 1;
        }
        EventMessage eventMessage = new EventMessage(serializable, this.hostname);
        CountDownLatch countDownLatch = new CountDownLatch((int) countSubscribers);
        this.acknowledgements.put(eventMessage.getUuid(), countDownLatch);
        try {
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(getName() + " sending invalidation message " + eventMessage.getUuid() + " for: " + serializable);
                }
                this.updateTopic.publish(eventMessage);
                if (LOG.isDebugEnabled()) {
                    LOG.debug(getName() + " invalidation message " + eventMessage.getUuid() + " sent...waiting on latch for " + countSubscribers + " acks");
                }
                SuiteConfiguration suiteConfiguration = (SuiteConfiguration) ConfigurationFactory.getConfiguration(SuiteConfiguration.class);
                long redisLocalCachePutLatchTimeout = suiteConfiguration.getRedisLocalCachePutLatchTimeout();
                long currentTimeMillis = this.temporalWrapper.currentTimeMillis();
                boolean await = countDownLatch.await(redisLocalCachePutLatchTimeout, TimeUnit.MILLISECONDS);
                ((Histogram.Child) putLatchElapsedTime.labels(new String[]{getName()})).observe((this.temporalWrapper.currentTimeMillis() - currentTimeMillis) / 1000.0d);
                if (LOG.isDebugEnabled()) {
                    LOG.debug(getName() + " latch " + eventMessage.getUuid() + " released count=" + countDownLatch.getCount() + "/" + countSubscribers + " timeout=" + (!await) + ": " + serializable);
                }
                if (await) {
                    this.timeoutCounter.getAndSet(0);
                } else {
                    ((Counter.Child) putLatchTimeoutCounter.labels(new String[]{getName(), "timeout"})).inc();
                    long redisLocalCacheTimeoutReconnectThreshold = suiteConfiguration.getRedisLocalCacheTimeoutReconnectThreshold();
                    if (this.timeoutCounter.incrementAndGet() > redisLocalCacheTimeoutReconnectThreshold) {
                        LOG.error("Put latch for " + getName() + " has timed out  more than " + redisLocalCacheTimeoutReconnectThreshold + "times");
                        dropOldConnectionEntry();
                        this.timeoutCounter.getAndSet(0);
                    }
                }
                this.acknowledgements.remove(eventMessage.getUuid());
            } catch (InterruptedException e) {
                LOG.info(getName() + " latch " + eventMessage.getUuid() + " interrupted: " + serializable);
                ((Counter.Child) putLatchTimeoutCounter.labels(new String[]{getName(), INTERRUPT_LABEL_VALUE})).inc();
                this.acknowledgements.remove(eventMessage.getUuid());
            }
        } catch (Throwable th) {
            this.acknowledgements.remove(eventMessage.getUuid());
            throw th;
        }
    }

    @Override // com.appiancorp.cache.RedisCacheBase
    public Object put(Object obj, Object obj2) {
        Object put = super.put(obj, obj2);
        distributedInvalidation((Serializable) obj);
        return put;
    }

    @Override // com.appiancorp.cache.RedisCacheBase
    public Object remove(Object obj) {
        Object remove = super.remove(obj);
        distributedInvalidation((Serializable) obj);
        return remove;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.appiancorp.cache.RedisCacheBase, com.appiancorp.cache.CacheBase
    public Object[] remove(Object[] objArr) {
        Object[] remove = super.remove(objArr);
        distributedInvalidation(objArr);
        return remove;
    }

    @Override // com.appiancorp.cache.RedisCacheBase
    public void clear() {
        super.clear();
        distributedInvalidation(null);
    }

    public void destroy() {
        this.updateTopic.removeAllListeners();
        this.redissonMapCache.delete();
        this.redissonMapCache.destroy();
    }
}
