/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.common;

import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.MessagesStats;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.AbstractTbQueueTemplate;
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;

public class PartitionedQueueResponseTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg>
extends AbstractTbQueueTemplate {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PartitionedQueueResponseTemplate.class);
    private final PartitionedQueueConsumerManager<Request> requestConsumer;
    private final TbQueueProducer<Response> responseProducer;
    private final TbQueueHandler<Request, Response> handler;
    private final long pollInterval;
    private final int maxPendingRequests;
    private final long requestTimeout;
    private final MessagesStats stats;
    private final ScheduledExecutorService scheduler;
    private final ExecutorService callbackExecutor;
    private final AtomicInteger pendingRequestCount = new AtomicInteger();

    public PartitionedQueueResponseTemplate(String key, TbQueueHandler<Request, Response> handler, String requestsTopic, Function<TopicPartitionInfo, TbQueueConsumer<Request>> consumerCreator, TbQueueProducer<Response> responseProducer, long pollInterval, long requestTimeout, int maxPendingRequests, ExecutorService consumerExecutor, ExecutorService callbackExecutor, ExecutorService consumerTaskExecutor, MessagesStats stats) {
        this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor((String)(key + "-queue-response-template-scheduler"));
        this.callbackExecutor = callbackExecutor;
        this.handler = handler;
        this.requestConsumer = PartitionedQueueConsumerManager.create().queueKey(key + "-requests").topic(requestsTopic).pollInterval(pollInterval).msgPackProcessor((requests, consumer, consumerKey, config) -> this.processRequests(requests, consumer)).consumerCreator((config, tpi) -> (TbQueueConsumer)consumerCreator.apply((TopicPartitionInfo)tpi)).consumerExecutor(consumerExecutor).scheduler(this.scheduler).taskExecutor(consumerTaskExecutor).build();
        this.responseProducer = responseProducer;
        this.pollInterval = pollInterval;
        this.maxPendingRequests = maxPendingRequests;
        this.requestTimeout = requestTimeout;
        this.stats = stats;
    }

    private void processRequests(List<Request> requests, TbQueueConsumer<Request> consumer) {
        while (this.pendingRequestCount.get() >= this.maxPendingRequests) {
            try {
                Thread.sleep(this.pollInterval);
            }
            catch (InterruptedException e) {
                log.trace("Failed to wait until the server has capacity to handle new requests", (Throwable)e);
            }
        }
        requests.forEach(request -> {
            long currentTime = System.currentTimeMillis();
            long expireTs = PartitionedQueueResponseTemplate.bytesToLong(request.getHeaders().get("expireTs"));
            if (expireTs >= currentTime) {
                byte[] requestIdHeader = request.getHeaders().get("requestId");
                if (requestIdHeader == null) {
                    log.error("[{}] Missing requestId in header", request);
                    return;
                }
                byte[] responseTopicHeader = request.getHeaders().get("responseTopic");
                if (responseTopicHeader == null) {
                    log.error("[{}] Missing response topic in header", request);
                    return;
                }
                final UUID requestId = PartitionedQueueResponseTemplate.bytesToUuid(requestIdHeader);
                String responseTopic = PartitionedQueueResponseTemplate.bytesToString(responseTopicHeader);
                try {
                    this.pendingRequestCount.getAndIncrement();
                    this.stats.incrementTotal();
                    AsyncCallbackTemplate.withCallbackAndTimeout(this.handler.handle(request), response -> {
                        this.pendingRequestCount.decrementAndGet();
                        response.getHeaders().put("requestId", PartitionedQueueResponseTemplate.uuidToBytes(requestId));
                        TopicPartitionInfo tpi = TopicPartitionInfo.builder().topic(responseTopic).build();
                        this.responseProducer.send(tpi, response, new TbQueueCallback(){
                            final /* synthetic */ TbQueueMsg val$response;
                            final /* synthetic */ TopicPartitionInfo val$tpi;
                            final /* synthetic */ TbQueueMsg val$request;
                            {
                                this.val$response = tbQueueMsg;
                                this.val$tpi = topicPartitionInfo;
                                this.val$request = tbQueueMsg2;
                            }

                            public void onSuccess(TbQueueMsgMetadata metadata) {
                                PartitionedQueueResponseTemplate.this.stats.incrementSuccessful();
                            }

                            public void onFailure(Throwable t) {
                                log.error("[{}] Failed to send response {}", new Object[]{requestId, this.val$response, t});
                                PartitionedQueueResponseTemplate.this.sendErrorResponse(requestId, this.val$tpi, this.val$request, t);
                                PartitionedQueueResponseTemplate.this.stats.incrementFailed();
                            }
                        });
                    }, e -> {
                        this.pendingRequestCount.decrementAndGet();
                        if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
                            log.warn("[{}] Timeout to process the request: {}", new Object[]{requestId, request, e});
                        } else {
                            log.trace("[{}] Failed to process the request: {}", new Object[]{requestId, request, e});
                        }
                        this.stats.incrementFailed();
                    }, this.requestTimeout, this.scheduler, this.callbackExecutor);
                }
                catch (Throwable e2) {
                    this.pendingRequestCount.decrementAndGet();
                    log.warn("[{}] Failed to process the request: {}", new Object[]{requestId, request, e2});
                    this.stats.incrementFailed();
                }
            }
        });
        consumer.commit();
    }

    private void sendErrorResponse(UUID requestId, TopicPartitionInfo tpi, Request request, Throwable cause) {
        TbQueueMsg errorResponseMsg = this.handler.constructErrorResponseMsg(request, cause);
        if (errorResponseMsg != null) {
            errorResponseMsg.getHeaders().put("requestId", PartitionedQueueResponseTemplate.uuidToBytes(requestId));
            this.responseProducer.send(tpi, errorResponseMsg, null);
        }
    }

    public void subscribe(Set<TopicPartitionInfo> partitions) {
        this.requestConsumer.update(partitions);
    }

    public void stop() {
        if (this.requestConsumer != null) {
            this.requestConsumer.stop();
            this.requestConsumer.awaitStop();
        }
        if (this.responseProducer != null) {
            this.responseProducer.stop();
        }
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
        }
    }

    @Generated
    public static <Request extends TbQueueMsg, Response extends TbQueueMsg> PartitionedQueueResponseTemplateBuilder<Request, Response> builder() {
        return new PartitionedQueueResponseTemplateBuilder();
    }

    @Generated
    public PartitionedQueueConsumerManager<Request> getRequestConsumer() {
        return this.requestConsumer;
    }

    @Generated
    public static class PartitionedQueueResponseTemplateBuilder<Request extends TbQueueMsg, Response extends TbQueueMsg> {
        @Generated
        private String key;
        @Generated
        private TbQueueHandler<Request, Response> handler;
        @Generated
        private String requestsTopic;
        @Generated
        private Function<TopicPartitionInfo, TbQueueConsumer<Request>> consumerCreator;
        @Generated
        private TbQueueProducer<Response> responseProducer;
        @Generated
        private long pollInterval;
        @Generated
        private long requestTimeout;
        @Generated
        private int maxPendingRequests;
        @Generated
        private ExecutorService consumerExecutor;
        @Generated
        private ExecutorService callbackExecutor;
        @Generated
        private ExecutorService consumerTaskExecutor;
        @Generated
        private MessagesStats stats;

        @Generated
        PartitionedQueueResponseTemplateBuilder() {
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> key(String key) {
            this.key = key;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> handler(TbQueueHandler<Request, Response> handler) {
            this.handler = handler;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> requestsTopic(String requestsTopic) {
            this.requestsTopic = requestsTopic;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> consumerCreator(Function<TopicPartitionInfo, TbQueueConsumer<Request>> consumerCreator) {
            this.consumerCreator = consumerCreator;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> responseProducer(TbQueueProducer<Response> responseProducer) {
            this.responseProducer = responseProducer;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> pollInterval(long pollInterval) {
            this.pollInterval = pollInterval;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> requestTimeout(long requestTimeout) {
            this.requestTimeout = requestTimeout;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> maxPendingRequests(int maxPendingRequests) {
            this.maxPendingRequests = maxPendingRequests;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> consumerExecutor(ExecutorService consumerExecutor) {
            this.consumerExecutor = consumerExecutor;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> callbackExecutor(ExecutorService callbackExecutor) {
            this.callbackExecutor = callbackExecutor;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> consumerTaskExecutor(ExecutorService consumerTaskExecutor) {
            this.consumerTaskExecutor = consumerTaskExecutor;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplateBuilder<Request, Response> stats(MessagesStats stats) {
            this.stats = stats;
            return this;
        }

        @Generated
        public PartitionedQueueResponseTemplate<Request, Response> build() {
            return new PartitionedQueueResponseTemplate<Request, Response>(this.key, this.handler, this.requestsTopic, this.consumerCreator, this.responseProducer, this.pollInterval, this.requestTimeout, this.maxPendingRequests, this.consumerExecutor, this.callbackExecutor, this.consumerTaskExecutor, this.stats);
        }

        @Generated
        public String toString() {
            return "PartitionedQueueResponseTemplate.PartitionedQueueResponseTemplateBuilder(key=" + this.key + ", handler=" + String.valueOf(this.handler) + ", requestsTopic=" + this.requestsTopic + ", consumerCreator=" + String.valueOf(this.consumerCreator) + ", responseProducer=" + String.valueOf(this.responseProducer) + ", pollInterval=" + this.pollInterval + ", requestTimeout=" + this.requestTimeout + ", maxPendingRequests=" + this.maxPendingRequests + ", consumerExecutor=" + String.valueOf(this.consumerExecutor) + ", callbackExecutor=" + String.valueOf(this.callbackExecutor) + ", consumerTaskExecutor=" + String.valueOf(this.consumerTaskExecutor) + ", stats=" + String.valueOf(this.stats) + ")";
        }
    }
}

