package org.thingsboard.server.queue.pubsub;

import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgDecoder;
import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;

/* loaded from: input_file:org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.class */
public class TbPubSubConsumerTemplate<T extends TbQueueMsg> extends AbstractParallelTbQueueConsumerTemplate<PubsubMessage, T> {
    private static final Logger log = LoggerFactory.getLogger(TbPubSubConsumerTemplate.class);
    private final Gson gson;
    private final TbQueueAdmin admin;
    private final String topic;
    private final TbQueueMsgDecoder<T> decoder;
    private final TbPubSubSettings pubSubSettings;
    private volatile Set<String> subscriptionNames;
    private final List<AcknowledgeRequest> acknowledgeRequests;
    private final SubscriberStub subscriber;
    private volatile int messagesPerTopic;

    public TbPubSubConsumerTemplate(TbQueueAdmin tbQueueAdmin, TbPubSubSettings tbPubSubSettings, String str, TbQueueMsgDecoder<T> tbQueueMsgDecoder) {
        super(str);
        this.gson = new Gson();
        this.acknowledgeRequests = new CopyOnWriteArrayList();
        this.admin = tbQueueAdmin;
        this.pubSubSettings = tbPubSubSettings;
        this.topic = str;
        this.decoder = tbQueueMsgDecoder;
        try {
            this.subscriber = GrpcSubscriberStub.create(SubscriberStubSettings.newBuilder().setCredentialsProvider(tbPubSubSettings.getCredentialsProvider()).setTransportChannelProvider(SubscriberStubSettings.defaultGrpcTransportProviderBuilder().setMaxInboundMessageSize(Integer.valueOf(tbPubSubSettings.getMaxMsgSize())).build()).build());
        } catch (IOException e) {
            log.error("Failed to create subscriber.", e);
            throw new RuntimeException("Failed to create subscriber.", e);
        }
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected List<PubsubMessage> doPoll(long j) {
        try {
            List<ReceivedMessage> receiveMessages = receiveMessages();
            if (!receiveMessages.isEmpty()) {
                return (List) receiveMessages.stream().map((v0) -> {
                    return v0.getMessage();
                }).collect(Collectors.toList());
            }
        } catch (InterruptedException | ExecutionException e) {
            if (this.stopped) {
                log.info("[{}] Pub/Sub consumer is stopped.", this.topic);
            } else {
                log.error("Failed to receive messages", e);
            }
        }
        return Collections.emptyList();
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doSubscribe(List<String> list) {
        this.subscriptionNames = new LinkedHashSet(list);
        Set<String> set = this.subscriptionNames;
        TbQueueAdmin tbQueueAdmin = this.admin;
        tbQueueAdmin.getClass();
        set.forEach(tbQueueAdmin::createTopicIfNotExists);
        initNewExecutor(this.subscriptionNames.size() + 1);
        this.messagesPerTopic = this.pubSubSettings.getMaxMessages() / Math.max(this.subscriptionNames.size(), 1);
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doCommit() {
        List<AcknowledgeRequest> list = this.acknowledgeRequests;
        UnaryCallable acknowledgeCallable = this.subscriber.acknowledgeCallable();
        acknowledgeCallable.getClass();
        list.forEach((v1) -> {
            r1.futureCall(v1);
        });
        this.acknowledgeRequests.clear();
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doUnsubscribe() {
        if (this.subscriber != null) {
            this.subscriber.close();
        }
        shutdownExecutor();
    }

    private List<ReceivedMessage> receiveMessages() throws ExecutionException, InterruptedException {
        return (List) ApiFutures.transform(ApiFutures.allAsList((List) this.subscriptionNames.stream().map(str -> {
            String format = ProjectSubscriptionName.format(this.pubSubSettings.getProjectId(), str);
            return ApiFutures.transform(this.subscriber.pullCallable().futureCall(PullRequest.newBuilder().setMaxMessages(this.messagesPerTopic).setSubscription(format).build()), pullResponse -> {
                if (pullResponse == null || pullResponse.getReceivedMessagesList().isEmpty()) {
                    return null;
                }
                ArrayList arrayList = new ArrayList();
                Iterator it = pullResponse.getReceivedMessagesList().iterator();
                while (it.hasNext()) {
                    arrayList.add(((ReceivedMessage) it.next()).getAckId());
                }
                this.acknowledgeRequests.add(AcknowledgeRequest.newBuilder().setSubscription(format).addAllAckIds(arrayList).build());
                return pullResponse.getReceivedMessagesList();
            }, this.consumerExecutor);
        }).collect(Collectors.toList())), list -> {
            return !CollectionUtils.isEmpty(list) ? (List) list.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList()) : Collections.emptyList();
        }, this.consumerExecutor).get();
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    public T decode(PubsubMessage pubsubMessage) throws InvalidProtocolBufferException {
        return this.decoder.decode((DefaultTbQueueMsg) this.gson.fromJson(pubsubMessage.getData().toStringUtf8(), DefaultTbQueueMsg.class));
    }
}
