package com.appian.komodo.client.handler;

import com.appian.komodo.api.EngineResponse;
import com.appian.komodo.api.KipcConstants;
import com.appian.komodo.api.exceptions.ConnectionClosedException;
import com.appian.komodo.api.exceptions.KougarException;
import com.appian.komodo.client.CancelRequestMessage;
import com.appian.komodo.client.EngineCall;
import com.appian.komodo.client.EngineSubchannel;
import com.appian.komodo.client.RequestNotProcessedException;
import com.appian.komodo.config.EngineId;
import com.appian.komodo.logging.CallTraceLogger;
import com.appian.komodo.util.ByteKey;
import com.appian.komodo.util.UUIDUtils;
import com.appian.komodo.util.io.SocketUtils;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import komodo.shaded.com.google.common.annotations.VisibleForTesting;
import komodo.shaded.com.google.common.base.Joiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/appian/komodo/client/handler/EngineResponseHandler.class */
public class EngineResponseHandler extends ChannelDuplexHandler {
    private static final long ENGINE_CALL_MAP_MAX_EXPECTED_AGE_MS = 420000;
    private final ConcurrentMap<ByteKey, EngineCallMapEntry> engineCallMap = new ConcurrentHashMap();
    private final Boolean retryEngineRequests;
    private static final Logger LOGGER = LoggerFactory.getLogger(EngineResponseHandler.class);
    private static final CallTraceLogger TRACE_LOGGER = CallTraceLogger.getLogger(LOGGER);
    private static final Joiner COMMA_JOINER = Joiner.on(",");
    private static final int ENGINE_CALL_MAP_MAX_EXPECTED_SIZE = Runtime.getRuntime().availableProcessors() * 2;
    private static final ByteKey ENGINE_RESTART_MESSAGE_BYTEKEY = new ByteKey(KipcConstants.ENGINE_RESTART_MESSAGE_CORRELATION_ID);

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:com/appian/komodo/client/handler/EngineResponseHandler$EngineCallMapEntry.class */
    public static class EngineCallMapEntry {
        private final EngineCall engineCall;
        private final long insertionTime;
        private final ByteKey key;

        private EngineCallMapEntry(EngineCall engineCall) {
            this(engineCall, System.currentTimeMillis());
        }

        private EngineCallMapEntry(EngineCall engineCall, long j) {
            this.engineCall = engineCall;
            this.insertionTime = j;
            this.key = new ByteKey(engineCall.getEngineRequest().getCorrelationId());
        }

        public EngineCall getEngineCall() {
            return this.engineCall;
        }

        public ByteKey getKey() {
            return this.key;
        }

        long getInsertionTime() {
            return this.insertionTime;
        }
    }

    public EngineResponseHandler(Boolean bool) {
        this.retryEngineRequests = bool;
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (this.retryEngineRequests.booleanValue() && (obj instanceof ChannelInputShutdownEvent)) {
            handleEngineRestarting(channelHandlerContext);
        } else {
            super.userEventTriggered(channelHandlerContext, obj);
        }
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
        EngineId engineIdFromContext = engineIdFromContext(channelHandlerContext);
        ScheduledFuture scheduleWithFixedDelay = channelHandlerContext.channel().eventLoop().scheduleWithFixedDelay(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            if (this.engineCallMap.size() > ENGINE_CALL_MAP_MAX_EXPECTED_SIZE) {
                LOGGER.warn("{} outstanding calls for {} at {}", new Object[]{Integer.valueOf(this.engineCallMap.size()), engineIdFromContext, remoteAddress});
            }
            for (EngineCallMapEntry engineCallMapEntry : this.engineCallMap.values()) {
                long insertionTime = currentTimeMillis - engineCallMapEntry.getInsertionTime();
                if (insertionTime > ENGINE_CALL_MAP_MAX_EXPECTED_AGE_MS) {
                    LOGGER.warn("EngineCall {} has not been completed after {} ms [{}]", new Object[]{UUIDUtils.byteArrayToUUID(engineCallMapEntry.getEngineCall().getEngineRequest().getCorrelationId()), Long.valueOf(insertionTime), engineCallMapEntry.getEngineCall().getEngineRequest().getRedactedEngineRequest()});
                }
            }
        }, 15L, 15L, TimeUnit.MINUTES);
        channelHandlerContext.channel().closeFuture().addListener(future -> {
            scheduleWithFixedDelay.cancel(false);
        });
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        if (obj instanceof EngineCall) {
            handleEngineCall(channelHandlerContext, (EngineCall) obj, channelPromise);
        } else if (obj instanceof CancelRequestMessage) {
            handleCancelRequest(channelHandlerContext, (CancelRequestMessage) obj);
        } else {
            super.write(channelHandlerContext, obj, channelPromise);
        }
    }

    private void handleEngineCall(ChannelHandlerContext channelHandlerContext, EngineCall engineCall, ChannelPromise channelPromise) throws Exception {
        TRACE_LOGGER.trace("Handling EngineCall", engineCall.getEngineRequest());
        EngineCallMapEntry engineCallMapEntry = new EngineCallMapEntry(engineCall);
        EngineCallMapEntry put = this.engineCallMap.put(engineCallMapEntry.getKey(), engineCallMapEntry);
        if (put != null) {
            UUID byteArrayToUUID = UUIDUtils.byteArrayToUUID(engineCall.getEngineRequest().getCorrelationId());
            LOGGER.error("EngineRequest with id {} was already being tracked.  This may result in unknown behavior. [oldRequest: {}, newRequest:{}]", new Object[]{byteArrayToUUID, put.getEngineCall().getEngineRequest().getRedactedEngineRequest(), engineCall.getEngineRequest().getRedactedEngineRequest()});
            put.getEngineCall().getSubchannelPromise().tryFailure(new KougarException(String.format("Received request with colliding uuid %s.", byteArrayToUUID)));
        }
        engineCall.getSubchannelPromise().addListener(future -> {
            TRACE_LOGGER.trace("Removing key from engineCallMap for EngineCall", engineCall.getEngineRequest());
            this.engineCallMap.remove(engineCallMapEntry.getKey());
        });
        super.write(channelHandlerContext, engineCall.getEngineRequest(), channelPromise);
    }

    private void handleCancelRequest(ChannelHandlerContext channelHandlerContext, CancelRequestMessage cancelRequestMessage) {
        EngineCallMapEntry remove = this.engineCallMap.remove(cancelRequestMessage.getKey());
        if (remove != null) {
            if (LOGGER.isTraceEnabled()) {
                TRACE_LOGGER.trace(String.format("Cancelling Enginecall to %s", channelHandlerContext.channel().remoteAddress()), remove.getEngineCall().getEngineRequest());
            }
            remove.getEngineCall().getSubchannelPromise().cancel(true);
        } else if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Attempted to cancel engine call {} to {} at {}, but a response had already been received", new Object[]{engineIdFromContext(channelHandlerContext), channelHandlerContext.channel().remoteAddress(), cancelRequestMessage.getKey().toString()});
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        EngineResponse engineResponse = (EngineResponse) obj;
        TRACE_LOGGER.trace("Handling EngineResponse", engineResponse);
        ByteKey byteKey = new ByteKey(engineResponse.getCorrelationId());
        EngineCallMapEntry engineCallMapEntry = this.engineCallMap.get(byteKey);
        if (engineCallMapEntry != null) {
            engineCallMapEntry.getEngineCall().getSubchannelPromise().trySuccess(engineResponse);
            return;
        }
        EngineId engineIdFromContext = engineIdFromContext(channelHandlerContext);
        if (!byteKey.equals(ENGINE_RESTART_MESSAGE_BYTEKEY) || !this.retryEngineRequests.booleanValue()) {
            LOGGER.warn("Engine response for {} did not correlate with an outstanding request: {}", engineIdFromContext, engineResponse.toBuilder().setValue("<payload redacted>").build());
        } else {
            LOGGER.info("Engine restart message received for {} on client..", engineIdFromContext);
            handleEngineRestarting(channelHandlerContext);
        }
    }

    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelUnregistered(channelHandlerContext);
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Unregistering channel for {}, clearing out correlation ids: [{}]", channelHandlerContext.channel().remoteAddress(), COMMA_JOINER.join(this.engineCallMap.keySet()));
        }
        for (EngineCallMapEntry engineCallMapEntry : this.engineCallMap.values()) {
            engineCallMapEntry.getEngineCall().getSubchannelPromise().tryFailure(new ConnectionClosedException(engineIdFromContext(channelHandlerContext), engineCallMapEntry.engineCall.getCallName()));
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        EngineId engineIdFromContext = engineIdFromContext(channelHandlerContext);
        if (Boolean.TRUE.equals(channelHandlerContext.channel().config().getOption(ChannelOption.ALLOW_HALF_CLOSURE)) && SocketUtils.isClientDisconnectError(th)) {
            LOGGER.debug("Connection loss exception for {}: {}", engineIdFromContext, th);
        } else {
            LOGGER.error("Unhandled exception for {}: {}", engineIdFromContext, th);
        }
        super.exceptionCaught(channelHandlerContext, th);
    }

    private void handleEngineRestarting(ChannelHandlerContext channelHandlerContext) {
        LOGGER.info("Server at {} is shutting down, retrying {} request(s)", channelHandlerContext.channel().remoteAddress(), Integer.valueOf(this.engineCallMap.size()));
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Server at {} is terminating the connection, retrying correlation ids: [{}]", channelHandlerContext.channel().remoteAddress(), COMMA_JOINER.join(this.engineCallMap.keySet()));
        }
        ((EngineSubchannel.PrepareToCloseCommand) channelHandlerContext.channel().attr(EngineSubchannel.PREPARE_TO_CLOSE_KEY).getAndSet((Object) null)).execute();
        Iterator<EngineCallMapEntry> it = this.engineCallMap.values().iterator();
        while (it.hasNext()) {
            it.next().getEngineCall().getSubchannelPromise().tryFailure(new RequestNotProcessedException());
        }
        channelHandlerContext.close();
    }

    private EngineId engineIdFromContext(ChannelHandlerContext channelHandlerContext) {
        return (EngineId) channelHandlerContext.channel().attr(EngineSubchannel.ENGINE_ID_KEY).get();
    }
}
