/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.pubsub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgDecoder;
import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
import org.thingsboard.server.queue.pubsub.TbPubSubSettings;

public class TbPubSubConsumerTemplate<T extends TbQueueMsg>
extends AbstractParallelTbQueueConsumerTemplate<PubsubMessage, T> {
    private static final Logger log = LoggerFactory.getLogger(TbPubSubConsumerTemplate.class);
    private final Gson gson = new Gson();
    private final TbQueueAdmin admin;
    private final String topic;
    private final TbQueueMsgDecoder<T> decoder;
    private final TbPubSubSettings pubSubSettings;
    private volatile Set<String> subscriptionNames;
    private final List<AcknowledgeRequest> acknowledgeRequests = new CopyOnWriteArrayList<AcknowledgeRequest>();
    private final SubscriberStub subscriber;
    private volatile int messagesPerTopic;

    public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder<T> decoder) {
        super(topic);
        this.admin = admin;
        this.pubSubSettings = pubSubSettings;
        this.topic = topic;
        this.decoder = decoder;
        try {
            SubscriberStubSettings subscriberStubSettings = ((SubscriberStubSettings.Builder)((SubscriberStubSettings.Builder)SubscriberStubSettings.newBuilder().setCredentialsProvider(pubSubSettings.getCredentialsProvider())).setTransportChannelProvider((TransportChannelProvider)SubscriberStubSettings.defaultGrpcTransportProviderBuilder().setMaxInboundMessageSize(Integer.valueOf(pubSubSettings.getMaxMsgSize())).build())).build();
            this.subscriber = GrpcSubscriberStub.create((SubscriberStubSettings)subscriberStubSettings);
        }
        catch (IOException e) {
            log.error("Failed to create subscriber.", (Throwable)e);
            throw new RuntimeException("Failed to create subscriber.", e);
        }
    }

    @Override
    protected List<PubsubMessage> doPoll(long durationInMillis) {
        try {
            List<ReceivedMessage> messages = this.receiveMessages();
            if (!messages.isEmpty()) {
                return messages.stream().map(ReceivedMessage::getMessage).collect(Collectors.toList());
            }
        }
        catch (InterruptedException | ExecutionException e) {
            if (this.stopped) {
                log.info("[{}] Pub/Sub consumer is stopped.", (Object)this.topic);
            }
            log.error("Failed to receive messages", (Throwable)e);
        }
        return Collections.emptyList();
    }

    @Override
    protected void doSubscribe(List<String> topicNames) {
        this.subscriptionNames = new LinkedHashSet<String>(topicNames);
        this.subscriptionNames.forEach(arg_0 -> ((TbQueueAdmin)this.admin).createTopicIfNotExists(arg_0));
        this.initNewExecutor(this.subscriptionNames.size() + 1);
        this.messagesPerTopic = this.pubSubSettings.getMaxMessages() / Math.max(this.subscriptionNames.size(), 1);
    }

    @Override
    protected void doCommit() {
        this.acknowledgeRequests.forEach(arg_0 -> ((UnaryCallable)this.subscriber.acknowledgeCallable()).futureCall(arg_0));
        this.acknowledgeRequests.clear();
    }

    @Override
    protected void doUnsubscribe() {
        if (this.subscriber != null) {
            this.subscriber.close();
        }
        this.shutdownExecutor();
    }

    private List<ReceivedMessage> receiveMessages() throws ExecutionException, InterruptedException {
        List result = this.subscriptionNames.stream().map(subscriptionId -> {
            String subscriptionName = ProjectSubscriptionName.format((String)this.pubSubSettings.getProjectId(), (String)subscriptionId);
            PullRequest pullRequest = PullRequest.newBuilder().setMaxMessages(this.messagesPerTopic).setSubscription(subscriptionName).build();
            ApiFuture pullResponseApiFuture = this.subscriber.pullCallable().futureCall((Object)pullRequest);
            return ApiFutures.transform((ApiFuture)pullResponseApiFuture, pullResponse -> {
                if (pullResponse != null && !pullResponse.getReceivedMessagesList().isEmpty()) {
                    ArrayList<String> ackIds = new ArrayList<String>();
                    for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
                        ackIds.add(message.getAckId());
                    }
                    AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().setSubscription(subscriptionName).addAllAckIds(ackIds).build();
                    this.acknowledgeRequests.add(acknowledgeRequest);
                    return pullResponse.getReceivedMessagesList();
                }
                return null;
            }, (Executor)this.consumerExecutor);
        }).collect(Collectors.toList());
        ApiFuture transform = ApiFutures.transform((ApiFuture)ApiFutures.allAsList(result), listMessages -> {
            if (!CollectionUtils.isEmpty((Collection)listMessages)) {
                return listMessages.stream().filter(Objects::nonNull).flatMap(Collection::stream).collect(Collectors.toList());
            }
            return Collections.emptyList();
        }, (Executor)this.consumerExecutor);
        return (List)transform.get();
    }

    @Override
    public T decode(PubsubMessage message) throws InvalidProtocolBufferException {
        DefaultTbQueueMsg msg = (DefaultTbQueueMsg)this.gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class);
        return (T)this.decoder.decode((TbQueueMsg)msg);
    }
}

