package org.thingsboard.server.queue.sqs;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgDecoder;
import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;

/* loaded from: input_file:org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate.class */
public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> extends AbstractParallelTbQueueConsumerTemplate<Message, T> {
    private static final Logger log = LoggerFactory.getLogger(TbAwsSqsConsumerTemplate.class);
    private static final int MAX_NUM_MSGS = 10;
    private final Gson gson;
    private final TbQueueAdmin admin;
    private final AmazonSQS sqsClient;
    private final TbQueueMsgDecoder<T> decoder;
    private final TbAwsSqsSettings sqsSettings;
    private final List<AwsSqsMsgWrapper> pendingMessages;
    private volatile Set<String> queueUrls;

    /* loaded from: input_file:org/thingsboard/server/queue/sqs/TbAwsSqsConsumerTemplate$AwsSqsMsgWrapper.class */
    private static class AwsSqsMsgWrapper {
        private final String url;
        private final List<Message> messages;

        public AwsSqsMsgWrapper(String str, List<Message> list) {
            this.url = str;
            this.messages = list;
        }

        public String getUrl() {
            return this.url;
        }

        public List<Message> getMessages() {
            return this.messages;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof AwsSqsMsgWrapper)) {
                return false;
            }
            AwsSqsMsgWrapper awsSqsMsgWrapper = (AwsSqsMsgWrapper) obj;
            if (!awsSqsMsgWrapper.canEqual(this)) {
                return false;
            }
            String url = getUrl();
            String url2 = awsSqsMsgWrapper.getUrl();
            if (url == null) {
                if (url2 != null) {
                    return false;
                }
            } else if (!url.equals(url2)) {
                return false;
            }
            List<Message> messages = getMessages();
            List<Message> messages2 = awsSqsMsgWrapper.getMessages();
            return messages == null ? messages2 == null : messages.equals(messages2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof AwsSqsMsgWrapper;
        }

        public int hashCode() {
            String url = getUrl();
            int hashCode = (1 * 59) + (url == null ? 43 : url.hashCode());
            List<Message> messages = getMessages();
            return (hashCode * 59) + (messages == null ? 43 : messages.hashCode());
        }

        public String toString() {
            return "TbAwsSqsConsumerTemplate.AwsSqsMsgWrapper(url=" + getUrl() + ", messages=" + getMessages() + ")";
        }
    }

    public TbAwsSqsConsumerTemplate(TbQueueAdmin tbQueueAdmin, TbAwsSqsSettings tbAwsSqsSettings, String str, TbQueueMsgDecoder<T> tbQueueMsgDecoder) {
        super(str);
        this.gson = new Gson();
        this.pendingMessages = new CopyOnWriteArrayList();
        this.admin = tbQueueAdmin;
        this.decoder = tbQueueMsgDecoder;
        this.sqsSettings = tbAwsSqsSettings;
        this.sqsClient = (AmazonSQS) AmazonSQSClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(tbAwsSqsSettings.getAccessKeyId(), tbAwsSqsSettings.getSecretAccessKey()))).withRegion(tbAwsSqsSettings.getRegion()).build();
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doSubscribe(List<String> list) {
        this.queueUrls = (Set) list.stream().map(this::getQueueUrl).collect(Collectors.toSet());
        initNewExecutor((this.queueUrls.size() * this.sqsSettings.getThreadsPerTopic()) + 1);
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected List<Message> doPoll(long j) {
        int seconds = (int) TimeUnit.MILLISECONDS.toSeconds(j);
        try {
            return (List) ((List) Futures.allAsList((List) this.queueUrls.stream().map(str -> {
                return poll(str, seconds);
            }).collect(Collectors.toList())).get()).stream().flatMap((v0) -> {
                return v0.stream();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            if (this.stopped) {
                log.info("[{}] Aws SQS consumer is stopped.", getTopic());
            } else {
                log.error("Failed to pool messages.", e);
            }
            return Collections.emptyList();
        }
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    public T decode(Message message) throws InvalidProtocolBufferException {
        return this.decoder.decode((DefaultTbQueueMsg) this.gson.fromJson(message.getBody(), DefaultTbQueueMsg.class));
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doCommit() {
        this.pendingMessages.forEach(awsSqsMsgWrapper -> {
            this.consumerExecutor.submit(() -> {
                this.sqsClient.deleteMessageBatch(awsSqsMsgWrapper.getUrl(), (List) awsSqsMsgWrapper.getMessages().stream().map(message -> {
                    return new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle());
                }).collect(Collectors.toList()));
            });
        });
        this.pendingMessages.clear();
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doUnsubscribe() {
        this.stopped = true;
        if (this.sqsClient != null) {
            this.sqsClient.shutdown();
        }
        shutdownExecutor();
    }

    private ListenableFuture<List<Message>> poll(String str, int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < this.sqsSettings.getThreadsPerTopic(); i2++) {
            arrayList.add(this.consumerExecutor.submit(() -> {
                ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
                receiveMessageRequest.withWaitTimeSeconds(Integer.valueOf(i)).withQueueUrl(str).withMaxNumberOfMessages(10);
                return this.sqsClient.receiveMessage(receiveMessageRequest).getMessages();
            }));
        }
        return Futures.transform(Futures.allAsList(arrayList), list -> {
            return !CollectionUtils.isEmpty(list) ? (List) list.stream().flatMap(list -> {
                if (list.isEmpty()) {
                    return Stream.empty();
                }
                this.pendingMessages.add(new AwsSqsMsgWrapper(str, list));
                return list.stream();
            }).collect(Collectors.toList()) : Collections.emptyList();
        }, this.consumerExecutor);
    }

    private String getQueueUrl(String str) {
        this.admin.createTopicIfNotExists(str);
        return this.sqsClient.getQueueUrl(str.replaceAll("\\.", "_") + ".fifo").getQueueUrl();
    }
}
