package org.thingsboard.server.queue.common;

import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.MessagesStats;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;

/* loaded from: input_file:org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.class */
public class PartitionedQueueResponseTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> extends AbstractTbQueueTemplate {
    private static final Logger log = LoggerFactory.getLogger(PartitionedQueueResponseTemplate.class);
    private final PartitionedQueueConsumerManager<Request> requestConsumer;
    private final TbQueueProducer<Response> responseProducer;
    private final TbQueueHandler<Request, Response> handler;
    private final long pollInterval;
    private final int maxPendingRequests;
    private final long requestTimeout;
    private final MessagesStats stats;
    private final ScheduledExecutorService scheduler;
    private final ExecutorService callbackExecutor;
    private final AtomicInteger pendingRequestCount = new AtomicInteger();

    /* loaded from: input_file:org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate$PartitionedQueueResponseTemplateBuilder.class */
    public static class PartitionedQueueResponseTemplateBuilder<Request extends TbQueueMsg, Response extends TbQueueMsg> {
        private String key;
        private TbQueueHandler<Request, Response> handler;
        private String requestsTopic;
        private Function<TopicPartitionInfo, TbQueueConsumer<Request>> consumerCreator;
        private TbQueueProducer<Response> responseProducer;
        private long pollInterval;
        private long requestTimeout;
        private int maxPendingRequests;
        private ExecutorService consumerExecutor;
        private ExecutorService callbackExecutor;
        private ExecutorService consumerTaskExecutor;
        private MessagesStats stats;

        PartitionedQueueResponseTemplateBuilder() {
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> key(String str) {
            this.key = str;
            return this;
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> handler(TbQueueHandler<Request, Response> tbQueueHandler) {
            this.handler = tbQueueHandler;
            return this;
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> requestsTopic(String str) {
            this.requestsTopic = str;
            return this;
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> consumerCreator(Function<TopicPartitionInfo, TbQueueConsumer<Request>> function) {
            this.consumerCreator = function;
            return this;
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> responseProducer(TbQueueProducer<Response> tbQueueProducer) {
            this.responseProducer = tbQueueProducer;
            return this;
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> pollInterval(long j) {
            this.pollInterval = j;
            return this;
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> requestTimeout(long j) {
            this.requestTimeout = j;
            return this;
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> maxPendingRequests(int i) {
            this.maxPendingRequests = i;
            return this;
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> consumerExecutor(ExecutorService executorService) {
            this.consumerExecutor = executorService;
            return this;
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> callbackExecutor(ExecutorService executorService) {
            this.callbackExecutor = executorService;
            return this;
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> consumerTaskExecutor(ExecutorService executorService) {
            this.consumerTaskExecutor = executorService;
            return this;
        }

        public PartitionedQueueResponseTemplateBuilder<Request, Response> stats(MessagesStats messagesStats) {
            this.stats = messagesStats;
            return this;
        }

        public PartitionedQueueResponseTemplate<Request, Response> build() {
            return new PartitionedQueueResponseTemplate<>(this.key, this.handler, this.requestsTopic, this.consumerCreator, this.responseProducer, this.pollInterval, this.requestTimeout, this.maxPendingRequests, this.consumerExecutor, this.callbackExecutor, this.consumerTaskExecutor, this.stats);
        }

        public String toString() {
            String str = this.key;
            String valueOf = String.valueOf(this.handler);
            String str2 = this.requestsTopic;
            String valueOf2 = String.valueOf(this.consumerCreator);
            String valueOf3 = String.valueOf(this.responseProducer);
            long j = this.pollInterval;
            long j2 = this.requestTimeout;
            int i = this.maxPendingRequests;
            String valueOf4 = String.valueOf(this.consumerExecutor);
            String valueOf5 = String.valueOf(this.callbackExecutor);
            String.valueOf(this.consumerTaskExecutor);
            String.valueOf(this.stats);
            return "PartitionedQueueResponseTemplate.PartitionedQueueResponseTemplateBuilder(key=" + str + ", handler=" + valueOf + ", requestsTopic=" + str2 + ", consumerCreator=" + valueOf2 + ", responseProducer=" + valueOf3 + ", pollInterval=" + j + ", requestTimeout=" + str + ", maxPendingRequests=" + j2 + ", consumerExecutor=" + str + ", callbackExecutor=" + i + ", consumerTaskExecutor=" + valueOf4 + ", stats=" + valueOf5 + ")";
        }
    }

    public PartitionedQueueResponseTemplate(String str, TbQueueHandler<Request, Response> tbQueueHandler, String str2, Function<TopicPartitionInfo, TbQueueConsumer<Request>> function, TbQueueProducer<Response> tbQueueProducer, long j, long j2, int i, ExecutorService executorService, ExecutorService executorService2, ExecutorService executorService3, MessagesStats messagesStats) {
        this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor(str + "-queue-response-template-scheduler");
        this.callbackExecutor = executorService2;
        this.handler = tbQueueHandler;
        this.requestConsumer = PartitionedQueueConsumerManager.create().queueKey(str + "-requests").topic(str2).pollInterval(j).msgPackProcessor((list, tbQueueConsumer, queueConfig) -> {
            processRequests(list, tbQueueConsumer);
        }).consumerCreator((queueConfig2, topicPartitionInfo) -> {
            return (TbQueueConsumer) function.apply(topicPartitionInfo);
        }).consumerExecutor(executorService).scheduler(this.scheduler).taskExecutor(executorService3).build();
        this.responseProducer = tbQueueProducer;
        this.pollInterval = j;
        this.maxPendingRequests = i;
        this.requestTimeout = j2;
        this.stats = messagesStats;
    }

    private void processRequests(List<Request> list, TbQueueConsumer<Request> tbQueueConsumer) {
        while (this.pendingRequestCount.get() >= this.maxPendingRequests) {
            try {
                Thread.sleep(this.pollInterval);
            } catch (InterruptedException e) {
                log.trace("Failed to wait until the server has capacity to handle new requests", e);
            }
        }
        list.forEach(tbQueueMsg -> {
            if (bytesToLong(tbQueueMsg.getHeaders().get("expireTs")) >= System.currentTimeMillis()) {
                byte[] bArr = tbQueueMsg.getHeaders().get("requestId");
                if (bArr == null) {
                    log.error("[{}] Missing requestId in header", tbQueueMsg);
                    return;
                }
                byte[] bArr2 = tbQueueMsg.getHeaders().get("responseTopic");
                if (bArr2 == null) {
                    log.error("[{}] Missing response topic in header", tbQueueMsg);
                    return;
                }
                UUID bytesToUuid = bytesToUuid(bArr);
                String bytesToString = bytesToString(bArr2);
                try {
                    this.pendingRequestCount.getAndIncrement();
                    this.stats.incrementTotal();
                    AsyncCallbackTemplate.withCallbackAndTimeout(this.handler.handle(tbQueueMsg), tbQueueMsg -> {
                        this.pendingRequestCount.decrementAndGet();
                        tbQueueMsg.getHeaders().put("requestId", uuidToBytes(bytesToUuid));
                        this.responseProducer.send(TopicPartitionInfo.builder().topic(bytesToString).build(), tbQueueMsg, (TbQueueCallback) null);
                        this.stats.incrementSuccessful();
                    }, th -> {
                        this.pendingRequestCount.decrementAndGet();
                        if (th.getCause() == null || !(th.getCause() instanceof TimeoutException)) {
                            log.trace("[{}] Failed to process the request: {}", new Object[]{bytesToUuid, tbQueueMsg, th});
                        } else {
                            log.warn("[{}] Timeout to process the request: {}", new Object[]{bytesToUuid, tbQueueMsg, th});
                        }
                        this.stats.incrementFailed();
                    }, this.requestTimeout, this.scheduler, this.callbackExecutor);
                } catch (Throwable th2) {
                    this.pendingRequestCount.decrementAndGet();
                    log.warn("[{}] Failed to process the request: {}", new Object[]{bytesToUuid, tbQueueMsg, th2});
                    this.stats.incrementFailed();
                }
            }
        });
        tbQueueConsumer.commit();
    }

    public void subscribe(Set<TopicPartitionInfo> set) {
        this.requestConsumer.update(set);
    }

    public void stop() {
        if (this.requestConsumer != null) {
            this.requestConsumer.stop();
            this.requestConsumer.awaitStop();
        }
        if (this.responseProducer != null) {
            this.responseProducer.stop();
        }
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
        }
    }

    public static <Request extends TbQueueMsg, Response extends TbQueueMsg> PartitionedQueueResponseTemplateBuilder<Request, Response> builder() {
        return new PartitionedQueueResponseTemplateBuilder<>();
    }

    public PartitionedQueueConsumerManager<Request> getRequestConsumer() {
        return this.requestConsumer;
    }
}
