package org.thingsboard.server.queue.common;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.MessagesStats;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;

/* loaded from: input_file:org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.class */
public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> extends AbstractTbQueueTemplate implements TbQueueResponseTemplate<Request, Response> {
    private static final Logger log = LoggerFactory.getLogger(DefaultTbQueueResponseTemplate.class);
    private final TbQueueConsumer<Request> requestTemplate;
    private final TbQueueProducer<Response> responseTemplate;
    private final ExecutorService loopExecutor;
    private final ScheduledExecutorService timeoutExecutor;
    private final ExecutorService callbackExecutor;
    private final MessagesStats stats;
    private final int maxPendingRequests;
    private final long requestTimeout;
    private final long pollInterval;
    private volatile boolean stopped = false;
    private final AtomicInteger pendingRequestCount = new AtomicInteger();
    private final ConcurrentMap<UUID, String> pendingRequests = new ConcurrentHashMap();

    /* loaded from: input_file:org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate$DefaultTbQueueResponseTemplateBuilder.class */
    public static class DefaultTbQueueResponseTemplateBuilder<Request extends TbQueueMsg, Response extends TbQueueMsg> {
        private TbQueueConsumer<Request> requestTemplate;
        private TbQueueProducer<Response> responseTemplate;
        private TbQueueHandler<Request, Response> handler;
        private long pollInterval;
        private long requestTimeout;
        private int maxPendingRequests;
        private ExecutorService executor;
        private MessagesStats stats;

        DefaultTbQueueResponseTemplateBuilder() {
        }

        public DefaultTbQueueResponseTemplateBuilder<Request, Response> requestTemplate(TbQueueConsumer<Request> tbQueueConsumer) {
            this.requestTemplate = tbQueueConsumer;
            return this;
        }

        public DefaultTbQueueResponseTemplateBuilder<Request, Response> responseTemplate(TbQueueProducer<Response> tbQueueProducer) {
            this.responseTemplate = tbQueueProducer;
            return this;
        }

        public DefaultTbQueueResponseTemplateBuilder<Request, Response> handler(TbQueueHandler<Request, Response> tbQueueHandler) {
            this.handler = tbQueueHandler;
            return this;
        }

        public DefaultTbQueueResponseTemplateBuilder<Request, Response> pollInterval(long j) {
            this.pollInterval = j;
            return this;
        }

        public DefaultTbQueueResponseTemplateBuilder<Request, Response> requestTimeout(long j) {
            this.requestTimeout = j;
            return this;
        }

        public DefaultTbQueueResponseTemplateBuilder<Request, Response> maxPendingRequests(int i) {
            this.maxPendingRequests = i;
            return this;
        }

        public DefaultTbQueueResponseTemplateBuilder<Request, Response> executor(ExecutorService executorService) {
            this.executor = executorService;
            return this;
        }

        public DefaultTbQueueResponseTemplateBuilder<Request, Response> stats(MessagesStats messagesStats) {
            this.stats = messagesStats;
            return this;
        }

        public DefaultTbQueueResponseTemplate<Request, Response> build() {
            return new DefaultTbQueueResponseTemplate<>(this.requestTemplate, this.responseTemplate, this.handler, this.pollInterval, this.requestTimeout, this.maxPendingRequests, this.executor, this.stats);
        }

        public String toString() {
            String valueOf = String.valueOf(this.requestTemplate);
            String valueOf2 = String.valueOf(this.responseTemplate);
            String valueOf3 = String.valueOf(this.handler);
            long j = this.pollInterval;
            long j2 = this.requestTimeout;
            int i = this.maxPendingRequests;
            String.valueOf(this.executor);
            String.valueOf(this.stats);
            return "DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder(requestTemplate=" + valueOf + ", responseTemplate=" + valueOf2 + ", handler=" + valueOf3 + ", pollInterval=" + j + ", requestTimeout=" + valueOf + ", maxPendingRequests=" + j2 + ", executor=" + valueOf + ", stats=" + i + ")";
        }
    }

    public DefaultTbQueueResponseTemplate(TbQueueConsumer<Request> tbQueueConsumer, TbQueueProducer<Response> tbQueueProducer, TbQueueHandler<Request, Response> tbQueueHandler, long j, long j2, int i, ExecutorService executorService, MessagesStats messagesStats) {
        this.requestTemplate = tbQueueConsumer;
        this.responseTemplate = tbQueueProducer;
        this.maxPendingRequests = i;
        this.pollInterval = j;
        this.requestTimeout = j2;
        this.callbackExecutor = executorService;
        this.stats = messagesStats;
        this.timeoutExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("tb-queue-response-template-timeout-" + tbQueueConsumer.getTopic());
        this.loopExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-queue-response-template-loop-" + tbQueueConsumer.getTopic()));
    }

    public void init(TbQueueHandler<Request, Response> tbQueueHandler) {
        this.responseTemplate.init();
        this.requestTemplate.subscribe();
        this.loopExecutor.submit(() -> {
            while (!this.stopped) {
                while (this.pendingRequestCount.get() >= this.maxPendingRequests) {
                    try {
                        try {
                            Thread.sleep(this.pollInterval);
                        } catch (InterruptedException e) {
                            log.trace("Failed to wait until the server has capacity to handle new requests", e);
                        }
                    } catch (Throwable th) {
                        log.warn("Failed to obtain messages from queue.", th);
                        try {
                            Thread.sleep(this.pollInterval);
                        } catch (InterruptedException e2) {
                            log.trace("Failed to wait until the server has capacity to handle new requests", e2);
                        }
                    }
                }
                List poll = this.requestTemplate.poll(this.pollInterval);
                if (!poll.isEmpty()) {
                    poll.forEach(tbQueueMsg -> {
                        if (bytesToLong(tbQueueMsg.getHeaders().get("expireTs")) >= System.currentTimeMillis()) {
                            byte[] bArr = tbQueueMsg.getHeaders().get("requestId");
                            if (bArr == null) {
                                log.error("[{}] Missing requestId in header", tbQueueMsg);
                                return;
                            }
                            byte[] bArr2 = tbQueueMsg.getHeaders().get("responseTopic");
                            if (bArr2 == null) {
                                log.error("[{}] Missing response topic in header", tbQueueMsg);
                                return;
                            }
                            UUID bytesToUuid = bytesToUuid(bArr);
                            String bytesToString = bytesToString(bArr2);
                            try {
                                this.pendingRequestCount.getAndIncrement();
                                this.stats.incrementTotal();
                                AsyncCallbackTemplate.withCallbackAndTimeout(tbQueueHandler.handle(tbQueueMsg), tbQueueMsg -> {
                                    this.pendingRequestCount.decrementAndGet();
                                    tbQueueMsg.getHeaders().put("requestId", uuidToBytes(bytesToUuid));
                                    this.responseTemplate.send(TopicPartitionInfo.builder().topic(bytesToString).build(), tbQueueMsg, (TbQueueCallback) null);
                                    this.stats.incrementSuccessful();
                                }, th2 -> {
                                    this.pendingRequestCount.decrementAndGet();
                                    if (th2.getCause() == null || !(th2.getCause() instanceof TimeoutException)) {
                                        log.trace("[{}] Failed to process the request: {}", new Object[]{bytesToUuid, tbQueueMsg, th2});
                                    } else {
                                        log.warn("[{}] Timeout to process the request: {}", new Object[]{bytesToUuid, tbQueueMsg, th2});
                                    }
                                    this.stats.incrementFailed();
                                }, this.requestTimeout, this.timeoutExecutor, this.callbackExecutor);
                            } catch (Throwable th3) {
                                this.pendingRequestCount.decrementAndGet();
                                log.warn("[{}] Failed to process the request: {}", new Object[]{bytesToUuid, tbQueueMsg, th3});
                                this.stats.incrementFailed();
                            }
                        }
                    });
                    this.requestTemplate.commit();
                }
            }
        });
    }

    public void stop() {
        this.stopped = true;
        if (this.requestTemplate != null) {
            this.requestTemplate.unsubscribe();
        }
        if (this.responseTemplate != null) {
            this.responseTemplate.stop();
        }
        if (this.timeoutExecutor != null) {
            this.timeoutExecutor.shutdownNow();
        }
        if (this.loopExecutor != null) {
            this.loopExecutor.shutdownNow();
        }
    }

    public static <Request extends TbQueueMsg, Response extends TbQueueMsg> DefaultTbQueueResponseTemplateBuilder<Request, Response> builder() {
        return new DefaultTbQueueResponseTemplateBuilder<>();
    }
}
