/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.azure.servicebus;

import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.microsoft.azure.servicebus.TransactionContext;
import com.microsoft.azure.servicebus.primitives.ClientEntity;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.CoreMessageReceiver;
import com.microsoft.azure.servicebus.primitives.MessageWithDeliveryTag;
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.SettleModePair;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgDecoder;
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;

public class TbServiceBusConsumerTemplate<T extends TbQueueMsg>
extends AbstractTbQueueConsumerTemplate<MessageWithDeliveryTag, T> {
    private static final Logger log = LoggerFactory.getLogger(TbServiceBusConsumerTemplate.class);
    private final TbQueueAdmin admin;
    private final TbQueueMsgDecoder<T> decoder;
    private final TbServiceBusSettings serviceBusSettings;
    private final Gson gson = new Gson();
    private Set<CoreMessageReceiver> receivers;
    private final Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<CoreMessageReceiver, Collection<MessageWithDeliveryTag>>();
    private volatile int messagesPerQueue;

    public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder<T> decoder) {
        super(topic);
        this.admin = admin;
        this.decoder = decoder;
        this.serviceBusSettings = serviceBusSettings;
    }

    @Override
    protected List<MessageWithDeliveryTag> doPoll(long durationInMillis) {
        List messageFutures = this.receivers.stream().map(receiver -> receiver.receiveAsync(this.messagesPerQueue, Duration.ofMillis(durationInMillis)).whenComplete((messages, err) -> {
            if (!CollectionUtils.isEmpty((Collection)messages)) {
                this.pendingMessages.put((CoreMessageReceiver)receiver, (Collection<MessageWithDeliveryTag>)messages);
            } else if (err != null) {
                log.error("Failed to receive messages.", err);
            }
        })).collect(Collectors.toList());
        try {
            return this.fromList(messageFutures).get().stream().flatMap(messages -> CollectionUtils.isEmpty((Collection)messages) ? Stream.empty() : messages.stream()).collect(Collectors.toList());
        }
        catch (InterruptedException | ExecutionException e) {
            if (this.stopped) {
                log.info("[{}] Service Bus consumer is stopped.", (Object)this.getTopic());
            } else {
                log.error("Failed to receive messages", (Throwable)e);
            }
            return Collections.emptyList();
        }
    }

    @Override
    protected void doSubscribe(List<String> topicNames) {
        this.createReceivers();
        this.messagesPerQueue = this.receivers.size() / Math.max(this.partitions.size(), 1);
    }

    @Override
    protected void doCommit() {
        this.pendingMessages.forEach((receiver, msgs) -> msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN)));
        this.pendingMessages.clear();
    }

    @Override
    protected void doUnsubscribe() {
        this.receivers.forEach(ClientEntity::closeAsync);
    }

    private void createReceivers() {
        List receiverFutures = this.partitions.stream().map(TopicPartitionInfo::getFullTopicName).map(queue -> {
            MessagingFactory factory;
            try {
                factory = MessagingFactory.createFromConnectionStringBuilder((ConnectionStringBuilder)this.createConnection((String)queue));
            }
            catch (InterruptedException | ExecutionException e) {
                log.error("Failed to create factory for the queue [{}]", queue);
                throw new RuntimeException("Failed to create the factory", e);
            }
            return CoreMessageReceiver.create((MessagingFactory)factory, (String)queue, (String)queue, (int)0, (SettleModePair)new SettleModePair(SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND), (MessagingEntityType)MessagingEntityType.QUEUE);
        }).collect(Collectors.toList());
        try {
            this.receivers = new HashSet<CoreMessageReceiver>(this.fromList(receiverFutures).get());
        }
        catch (InterruptedException | ExecutionException e) {
            if (this.stopped) {
                log.info("[{}] Service Bus consumer is stopped.", (Object)this.getTopic());
            }
            log.error("Failed to create receivers", (Throwable)e);
        }
    }

    private ConnectionStringBuilder createConnection(String queue) {
        this.admin.createTopicIfNotExists(queue);
        return new ConnectionStringBuilder(this.serviceBusSettings.getNamespaceName(), queue, this.serviceBusSettings.getSasKeyName(), this.serviceBusSettings.getSasKey());
    }

    private <V> CompletableFuture<List<V>> fromList(List<CompletableFuture<V>> futures) {
        CompletableFuture[] arrayFuture = new CompletableFuture[futures.size()];
        futures.toArray(arrayFuture);
        return CompletableFuture.allOf(arrayFuture).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }

    @Override
    protected T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException {
        DefaultTbQueueMsg msg = (DefaultTbQueueMsg)this.gson.fromJson(new String(((Data)data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class);
        return (T)this.decoder.decode((TbQueueMsg)msg);
    }
}

