package org.thingsboard.server.queue.azure.servicebus;

import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.microsoft.azure.servicebus.TransactionContext;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.CoreMessageReceiver;
import com.microsoft.azure.servicebus.primitives.MessageWithDeliveryTag;
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.SettleModePair;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgDecoder;
import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;

/* loaded from: input_file:org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.class */
public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<MessageWithDeliveryTag, T> {
    private static final Logger log = LoggerFactory.getLogger(TbServiceBusConsumerTemplate.class);
    private final TbQueueAdmin admin;
    private final TbQueueMsgDecoder<T> decoder;
    private final TbServiceBusSettings serviceBusSettings;
    private final Gson gson;
    private Set<CoreMessageReceiver> receivers;
    private final Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages;
    private volatile int messagesPerQueue;

    public TbServiceBusConsumerTemplate(TbQueueAdmin tbQueueAdmin, TbServiceBusSettings tbServiceBusSettings, String str, TbQueueMsgDecoder<T> tbQueueMsgDecoder) {
        super(str);
        this.gson = new Gson();
        this.pendingMessages = new ConcurrentHashMap();
        this.admin = tbQueueAdmin;
        this.decoder = tbQueueMsgDecoder;
        this.serviceBusSettings = tbServiceBusSettings;
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected List<MessageWithDeliveryTag> doPoll(long j) {
        try {
            return (List) ((List) fromList((List) this.receivers.stream().map(coreMessageReceiver -> {
                return coreMessageReceiver.receiveAsync(this.messagesPerQueue, Duration.ofMillis(j)).whenComplete((collection, th) -> {
                    if (!CollectionUtils.isEmpty(collection)) {
                        this.pendingMessages.put(coreMessageReceiver, collection);
                    } else if (th != null) {
                        log.error("Failed to receive messages.", th);
                    }
                });
            }).collect(Collectors.toList())).get()).stream().flatMap(collection -> {
                return CollectionUtils.isEmpty(collection) ? Stream.empty() : collection.stream();
            }).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            if (this.stopped) {
                log.info("[{}] Service Bus consumer is stopped.", getTopic());
            } else {
                log.error("Failed to receive messages", e);
            }
            return Collections.emptyList();
        }
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doSubscribe(List<String> list) {
        createReceivers();
        this.messagesPerQueue = this.receivers.size() / Math.max(this.partitions.size(), 1);
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doCommit() {
        this.pendingMessages.forEach((coreMessageReceiver, collection) -> {
            collection.forEach(messageWithDeliveryTag -> {
                coreMessageReceiver.completeMessageAsync(messageWithDeliveryTag.getDeliveryTag(), TransactionContext.NULL_TXN);
            });
        });
        this.pendingMessages.clear();
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doUnsubscribe() {
        this.receivers.forEach((v0) -> {
            v0.closeAsync();
        });
    }

    private void createReceivers() {
        try {
            this.receivers = new HashSet((Collection) fromList((List) this.partitions.stream().map((v0) -> {
                return v0.getFullTopicName();
            }).map(str -> {
                try {
                    return CoreMessageReceiver.create(MessagingFactory.createFromConnectionStringBuilder(createConnection(str)), str, str, 0, new SettleModePair(SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND), MessagingEntityType.QUEUE);
                } catch (InterruptedException | ExecutionException e) {
                    log.error("Failed to create factory for the queue [{}]", str);
                    throw new RuntimeException("Failed to create the factory", e);
                }
            }).collect(Collectors.toList())).get());
        } catch (InterruptedException | ExecutionException e) {
            if (this.stopped) {
                log.info("[{}] Service Bus consumer is stopped.", getTopic());
            } else {
                log.error("Failed to create receivers", e);
            }
        }
    }

    private ConnectionStringBuilder createConnection(String str) {
        this.admin.createTopicIfNotExists(str);
        return new ConnectionStringBuilder(this.serviceBusSettings.getNamespaceName(), str, this.serviceBusSettings.getSasKeyName(), this.serviceBusSettings.getSasKey());
    }

    private <V> CompletableFuture<List<V>> fromList(List<CompletableFuture<V>> list) {
        CompletableFuture[] completableFutureArr = new CompletableFuture[list.size()];
        list.toArray(completableFutureArr);
        return (CompletableFuture<List<V>>) CompletableFuture.allOf(completableFutureArr).thenApply(r4 -> {
            return (List) list.stream().map((v0) -> {
                return v0.join();
            }).collect(Collectors.toList());
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    public T decode(MessageWithDeliveryTag messageWithDeliveryTag) throws InvalidProtocolBufferException {
        return (T) this.decoder.decode((DefaultTbQueueMsg) this.gson.fromJson(new String(messageWithDeliveryTag.getMessage().getBody().getValue().getArray()), DefaultTbQueueMsg.class));
    }
}
