package software.amazon.kinesis.multilang;

import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
import software.amazon.kinesis.multilang.messages.CheckpointMessage;
import software.amazon.kinesis.multilang.messages.InitializeMessage;
import software.amazon.kinesis.multilang.messages.LeaseLostMessage;
import software.amazon.kinesis.multilang.messages.Message;
import software.amazon.kinesis.multilang.messages.ProcessRecordsMessage;
import software.amazon.kinesis.multilang.messages.ShardEndedMessage;
import software.amazon.kinesis.multilang.messages.ShutdownRequestedMessage;
import software.amazon.kinesis.multilang.messages.StatusMessage;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:software/amazon/kinesis/multilang/MultiLangProtocol.class */
public class MultiLangProtocol {
    private static final Logger log = LoggerFactory.getLogger(MultiLangProtocol.class);
    private final InitializationInput initializationInput;
    private final Optional<Integer> timeoutInSeconds;
    private MessageReader messageReader;
    private MessageWriter messageWriter;
    private MultiLangDaemonConfiguration configuration;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:software/amazon/kinesis/multilang/MultiLangProtocol$FutureMethod.class */
    public interface FutureMethod<T> {
        T get() throws InterruptedException, TimeoutException, ExecutionException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, InitializationInput initializationInput, MultiLangDaemonConfiguration multiLangDaemonConfiguration) {
        this.messageReader = messageReader;
        this.messageWriter = messageWriter;
        this.initializationInput = initializationInput;
        this.configuration = multiLangDaemonConfiguration;
        this.timeoutInSeconds = Optional.ofNullable(multiLangDaemonConfiguration.getTimeoutInSeconds());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean initialize() {
        return waitForStatusMessage(InitializeMessage.ACTION, null, this.messageWriter.writeInitializeMessage(this.initializationInput));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean processRecords(ProcessRecordsInput processRecordsInput) {
        return waitForStatusMessage(ProcessRecordsMessage.ACTION, processRecordsInput.checkpointer(), this.messageWriter.writeProcessRecordsMessage(processRecordsInput));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean leaseLost(LeaseLostInput leaseLostInput) {
        return waitForStatusMessage(LeaseLostMessage.ACTION, null, this.messageWriter.writeLeaseLossMessage(leaseLostInput));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shardEnded(ShardEndedInput shardEndedInput) {
        return waitForStatusMessage(ShardEndedMessage.ACTION, shardEndedInput.checkpointer(), this.messageWriter.writeShardEndedMessage(shardEndedInput));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shutdownRequested(RecordProcessorCheckpointer recordProcessorCheckpointer) {
        return waitForStatusMessage(ShutdownRequestedMessage.ACTION, recordProcessorCheckpointer, this.messageWriter.writeShutdownRequestedMessage());
    }

    private boolean waitForStatusMessage(String str, RecordProcessorCheckpointer recordProcessorCheckpointer, Future<Boolean> future) {
        try {
            return waitForStatusMessage(str, recordProcessorCheckpointer) && future.get().booleanValue();
        } catch (InterruptedException e) {
            log.error("Interrupted while writing {} message for shard {}", str, this.initializationInput.shardId());
            return false;
        } catch (ExecutionException e2) {
            log.error("Failed to write {} message for shard {}", new Object[]{str, this.initializationInput.shardId(), e2});
            return false;
        }
    }

    boolean waitForStatusMessage(String str, RecordProcessorCheckpointer recordProcessorCheckpointer) {
        Optional empty = Optional.empty();
        while (true) {
            Optional optional = empty;
            if (optional.isPresent()) {
                return validateStatusMessage((StatusMessage) optional.get(), str);
            }
            Future<Message> nextMessageFromSTDOUT = this.messageReader.getNextMessageFromSTDOUT();
            Optional<U> map = this.timeoutInSeconds.map(num -> {
                return futureMethod(() -> {
                    return (Message) nextMessageFromSTDOUT.get(num.intValue(), TimeUnit.SECONDS);
                }, str);
            });
            nextMessageFromSTDOUT.getClass();
            Optional optional2 = (Optional) map.orElse(futureMethod(nextMessageFromSTDOUT::get, str));
            if (!optional2.isPresent() || ((Boolean) optional2.filter(message -> {
                return message instanceof CheckpointMessage;
            }).map(message2 -> {
                return (CheckpointMessage) message2;
            }).flatMap(checkpointMessage -> {
                return futureMethod(() -> {
                    return checkpoint(checkpointMessage, recordProcessorCheckpointer).get();
                }, "Checkpoint");
            }).map(bool -> {
                return Boolean.valueOf(!bool.booleanValue());
            }).orElse(false)).booleanValue()) {
                return false;
            }
            empty = optional2.filter(message3 -> {
                return message3 instanceof StatusMessage;
            }).map(message4 -> {
                return (StatusMessage) message4;
            });
        }
    }

    private <T> Optional<T> futureMethod(FutureMethod<T> futureMethod, String str) {
        try {
            return Optional.of(futureMethod.get());
        } catch (InterruptedException e) {
            log.error("Interrupted while waiting for {} message for shard {}", new Object[]{str, this.initializationInput.shardId(), e});
            return Optional.empty();
        } catch (ExecutionException e2) {
            log.error("Failed to get status message for {} action for shard {}", new Object[]{str, this.initializationInput.shardId(), e2});
            return Optional.empty();
        } catch (TimeoutException e3) {
            log.error("Timedout to get status message for {} action for shard {}. Terminating...", new Object[]{str, this.initializationInput.shardId(), e3});
            haltJvm(1);
            return Optional.empty();
        }
    }

    protected void haltJvm(int i) {
        Runtime.getRuntime().halt(i);
    }

    private boolean validateStatusMessage(StatusMessage statusMessage, String str) {
        log.info("Received response {} from subprocess while waiting for {} while processing shard {}", new Object[]{statusMessage, str, this.initializationInput.shardId()});
        return (statusMessage == null || statusMessage.getResponseFor() == null || !statusMessage.getResponseFor().equals(str)) ? false : true;
    }

    private Future<Boolean> checkpoint(CheckpointMessage checkpointMessage, RecordProcessorCheckpointer recordProcessorCheckpointer) {
        String sequenceNumber = checkpointMessage.getSequenceNumber();
        Long subSequenceNumber = checkpointMessage.getSubSequenceNumber();
        try {
            if (recordProcessorCheckpointer == null) {
                String format = String.format("Was asked to checkpoint at %s but no checkpointer was provided for shard %s", sequenceNumber, this.initializationInput.shardId());
                log.error(format);
                return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, new InvalidStateException(format));
            }
            log.debug(logCheckpointMessage(sequenceNumber, subSequenceNumber));
            if (sequenceNumber == null) {
                recordProcessorCheckpointer.checkpoint();
            } else if (subSequenceNumber != null) {
                recordProcessorCheckpointer.checkpoint(sequenceNumber, subSequenceNumber.longValue());
            } else {
                recordProcessorCheckpointer.checkpoint(sequenceNumber);
            }
            return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, null);
        } catch (Throwable th) {
            return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, th);
        }
    }

    private String logCheckpointMessage(String str, Long l) {
        return String.format("Attempting to checkpoint shard %s @ sequence number %s, and sub sequence number %s", this.initializationInput.shardId(), str, l);
    }
}
