package com.appiancorp.cache.persist;

import com.appian.kafka.KafkaConsumerProcessor;
import com.appian.kafka.KafkaTopicManager;
import com.appiancorp.cache.persist.MessageBroadcaster;
import com.appiancorp.cache.persist.metrics.MessageBroadcasterKafkaMetrics;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/appiancorp/cache/persist/MessageBroadcasterKafkaConsumer.class */
public class MessageBroadcasterKafkaConsumer implements KafkaConsumerProcessor<BroadcastMessageImpl> {
    private static final Logger LOG = LoggerFactory.getLogger(MessageBroadcasterKafkaConsumer.class);
    final MessageBroadcaster.TopicType topicType;
    private final BroadcastMessageHandler broadcastMessageHandler;

    public MessageBroadcasterKafkaConsumer(KafkaTopicManager kafkaTopicManager, MessageBroadcasterKafkaConsumerConfig messageBroadcasterKafkaConsumerConfig, MessageBroadcasterKafkaMetrics messageBroadcasterKafkaMetrics, MessageBroadcaster.TopicType topicType, BroadcastMessageHandler broadcastMessageHandler) {
        if (topicType == null) {
            throw new NullPointerException("MessageBroadcaster.TopicType tt");
        }
        this.topicType = topicType;
        if (broadcastMessageHandler == null) {
            throw new NullPointerException("BroadcastMessageHandler bmh");
        }
        this.broadcastMessageHandler = broadcastMessageHandler;
        String kafkaTopicName = this.topicType.toKafkaTopicName();
        kafkaTopicManager.registerBroadcastConsumer(kafkaTopicName, kafkaTopicName, 100, messageBroadcasterKafkaConsumerConfig.getHighWaterMarkQueueSize(), messageBroadcasterKafkaConsumerConfig.getHighWaterMarkTimeSec(), messageBroadcasterKafkaMetrics, this);
    }

    public int processMessages(List<BroadcastMessageImpl> list) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("processing " + list.size() + "messages for " + this.topicType.toKafkaTopicName());
        }
        Iterator<BroadcastMessageImpl> it = list.iterator();
        while (it.hasNext()) {
            this.broadcastMessageHandler.onMessage(it.next());
        }
        return list.size();
    }

    public void onDeadLetteringDataItems(List<BroadcastMessageImpl> list) {
        LOG.warn(this.topicType.toKafkaTopicName() + ": Kafka failed to process messages and is committing {} messages to unblock the queue", Integer.valueOf(list.size()));
    }

    public Class<BroadcastMessageImpl> getSupportedMessageType() {
        return BroadcastMessageImpl.class;
    }
}
