/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.sqs;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgDecoder;
import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
import org.thingsboard.server.queue.sqs.TbAwsSqsSettings;

public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg>
extends AbstractParallelTbQueueConsumerTemplate<Message, T> {
    private static final Logger log = LoggerFactory.getLogger(TbAwsSqsConsumerTemplate.class);
    private static final int MAX_NUM_MSGS = 10;
    private final Gson gson = new Gson();
    private final TbQueueAdmin admin;
    private final AmazonSQS sqsClient;
    private final TbQueueMsgDecoder<T> decoder;
    private final TbAwsSqsSettings sqsSettings;
    private final List<AwsSqsMsgWrapper> pendingMessages = new CopyOnWriteArrayList<AwsSqsMsgWrapper>();
    private volatile Set<String> queueUrls;

    public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder<T> decoder) {
        super(topic);
        DefaultAWSCredentialsProviderChain credentialsProvider;
        this.admin = admin;
        this.decoder = decoder;
        this.sqsSettings = sqsSettings;
        if (sqsSettings.getUseDefaultCredentialProviderChain().booleanValue()) {
            credentialsProvider = new DefaultAWSCredentialsProviderChain();
        } else {
            BasicAWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
            credentialsProvider = new AWSStaticCredentialsProvider((AWSCredentials)awsCredentials);
        }
        this.sqsClient = (AmazonSQS)((AmazonSQSClientBuilder)((AmazonSQSClientBuilder)AmazonSQSClientBuilder.standard().withCredentials((AWSCredentialsProvider)credentialsProvider)).withRegion(sqsSettings.getRegion())).build();
    }

    @Override
    protected void doSubscribe(List<String> topicNames) {
        this.queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet());
        this.initNewExecutor(this.queueUrls.size() * this.sqsSettings.getThreadsPerTopic() + 1);
    }

    @Override
    protected List<Message> doPoll(long durationInMillis) {
        int duration = (int)TimeUnit.MILLISECONDS.toSeconds(durationInMillis);
        List futureList = this.queueUrls.stream().map(url -> this.poll((String)url, duration)).collect(Collectors.toList());
        ListenableFuture futureResult = Futures.allAsList(futureList);
        try {
            return ((List)futureResult.get()).stream().flatMap(Collection::stream).filter(Objects::nonNull).collect(Collectors.toList());
        }
        catch (InterruptedException | ExecutionException e) {
            if (this.stopped) {
                log.info("[{}] Aws SQS consumer is stopped.", (Object)this.getTopic());
            } else {
                log.error("Failed to pool messages.", (Throwable)e);
            }
            return Collections.emptyList();
        }
    }

    @Override
    public T decode(Message message) throws InvalidProtocolBufferException {
        DefaultTbQueueMsg msg = (DefaultTbQueueMsg)this.gson.fromJson(message.getBody(), DefaultTbQueueMsg.class);
        return (T)this.decoder.decode((TbQueueMsg)msg);
    }

    @Override
    protected void doCommit() {
        this.pendingMessages.forEach(msg -> this.consumerExecutor.submit(() -> {
            List entries = msg.getMessages().stream().map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle())).collect(Collectors.toList());
            this.sqsClient.deleteMessageBatch(msg.getUrl(), entries);
        }));
        this.pendingMessages.clear();
    }

    @Override
    protected void doUnsubscribe() {
        this.stopped = true;
        if (this.sqsClient != null) {
            this.sqsClient.shutdown();
        }
        this.shutdownExecutor();
    }

    private ListenableFuture<List<Message>> poll(String url, int waitTimeSeconds) {
        ArrayList<ListenableFuture> result = new ArrayList<ListenableFuture>();
        for (int i = 0; i < this.sqsSettings.getThreadsPerTopic(); ++i) {
            result.add(this.consumerExecutor.submit(() -> {
                ReceiveMessageRequest request = new ReceiveMessageRequest();
                request.withWaitTimeSeconds(Integer.valueOf(waitTimeSeconds)).withQueueUrl(url).withMaxNumberOfMessages(Integer.valueOf(10));
                return this.sqsClient.receiveMessage(request).getMessages();
            }));
        }
        return Futures.transform((ListenableFuture)Futures.allAsList(result), list -> {
            if (!CollectionUtils.isEmpty((Collection)list)) {
                return list.stream().flatMap(messageList -> {
                    if (!messageList.isEmpty()) {
                        this.pendingMessages.add(new AwsSqsMsgWrapper(url, (List<Message>)messageList));
                        return messageList.stream();
                    }
                    return Stream.empty();
                }).collect(Collectors.toList());
            }
            return Collections.emptyList();
        }, (Executor)this.consumerExecutor);
    }

    private String getQueueUrl(String topic) {
        this.admin.createTopicIfNotExists(topic);
        return this.sqsClient.getQueueUrl(topic.replaceAll("\\.", "_") + ".fifo").getQueueUrl();
    }

    private static class AwsSqsMsgWrapper {
        private final String url;
        private final List<Message> messages;

        public AwsSqsMsgWrapper(String url, List<Message> messages) {
            this.url = url;
            this.messages = messages;
        }

        public String getUrl() {
            return this.url;
        }

        public List<Message> getMessages() {
            return this.messages;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof AwsSqsMsgWrapper)) {
                return false;
            }
            AwsSqsMsgWrapper other = (AwsSqsMsgWrapper)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$url = this.getUrl();
            String other$url = other.getUrl();
            if (this$url == null ? other$url != null : !this$url.equals(other$url)) {
                return false;
            }
            List<Message> this$messages = this.getMessages();
            List<Message> other$messages = other.getMessages();
            return !(this$messages == null ? other$messages != null : !((Object)this$messages).equals(other$messages));
        }

        protected boolean canEqual(Object other) {
            return other instanceof AwsSqsMsgWrapper;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $url = this.getUrl();
            result = result * 59 + ($url == null ? 43 : $url.hashCode());
            List<Message> $messages = this.getMessages();
            result = result * 59 + ($messages == null ? 43 : ((Object)$messages).hashCode());
            return result;
        }

        public String toString() {
            return "TbAwsSqsConsumerTemplate.AwsSqsMsgWrapper(url=" + this.getUrl() + ", messages=" + this.getMessages() + ")";
        }
    }
}

