/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.kafka.AbstractTbKafkaTemplate;
import org.thingsboard.server.kafka.AsyncCallbackTemplate;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaHandler;

public class TbKafkaResponseTemplate<Request, Response>
extends AbstractTbKafkaTemplate {
    private static final Logger log = LoggerFactory.getLogger(TbKafkaResponseTemplate.class);
    private final TBKafkaConsumerTemplate<Request> requestTemplate;
    private final TBKafkaProducerTemplate<Response> responseTemplate;
    private final TbKafkaHandler<Request, Response> handler;
    private final ConcurrentMap<UUID, String> pendingRequests;
    private final ExecutorService loopExecutor;
    private final ScheduledExecutorService timeoutExecutor;
    private final ExecutorService callbackExecutor;
    private final int maxPendingRequests;
    private final long requestTimeout;
    private final long pollInterval;
    private volatile boolean stopped = false;
    private final AtomicInteger pendingRequestCount = new AtomicInteger();

    public TbKafkaResponseTemplate(TBKafkaConsumerTemplate<Request> requestTemplate, TBKafkaProducerTemplate<Response> responseTemplate, TbKafkaHandler<Request, Response> handler, long pollInterval, long requestTimeout, int maxPendingRequests, ExecutorService executor) {
        this.requestTemplate = requestTemplate;
        this.responseTemplate = responseTemplate;
        this.handler = handler;
        this.pendingRequests = new ConcurrentHashMap<UUID, String>();
        this.maxPendingRequests = maxPendingRequests;
        this.pollInterval = pollInterval;
        this.requestTimeout = requestTimeout;
        this.callbackExecutor = executor;
        this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
        this.loopExecutor = Executors.newSingleThreadExecutor();
    }

    public void init() {
        this.responseTemplate.init();
        this.requestTemplate.subscribe();
        this.loopExecutor.submit(() -> {
            while (!this.stopped) {
                try {
                    while (this.pendingRequestCount.get() >= this.maxPendingRequests) {
                        try {
                            Thread.sleep(this.pollInterval);
                        }
                        catch (InterruptedException e) {
                            log.trace("Failed to wait until the server has capacity to handle new requests", (Throwable)e);
                        }
                    }
                    ConsumerRecords<String, byte[]> requests = this.requestTemplate.poll(Duration.ofMillis(this.pollInterval));
                    requests.forEach(request -> {
                        Header requestIdHeader = request.headers().lastHeader("requestId");
                        if (requestIdHeader == null) {
                            log.error("[{}] Missing requestId in header", request);
                            return;
                        }
                        UUID requestId = TbKafkaResponseTemplate.bytesToUuid(requestIdHeader.value());
                        if (requestId == null) {
                            log.error("[{}] Missing requestId in header and body", request);
                            return;
                        }
                        Header responseTopicHeader = request.headers().lastHeader("responseTopic");
                        if (responseTopicHeader == null) {
                            log.error("[{}] Missing response topic in header", request);
                            return;
                        }
                        String responseTopic = this.bytesToString(responseTopicHeader.value());
                        try {
                            this.pendingRequestCount.getAndIncrement();
                            Request decodedRequest = this.requestTemplate.decode((ConsumerRecord<String, byte[]>)request);
                            AsyncCallbackTemplate.withCallbackAndTimeout(this.handler.handle(decodedRequest), response -> {
                                this.pendingRequestCount.decrementAndGet();
                                this.reply(requestId, responseTopic, response);
                            }, e -> {
                                this.pendingRequestCount.decrementAndGet();
                                if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
                                    log.warn("[{}] Timedout to process the request: {}", new Object[]{requestId, request, e});
                                } else {
                                    log.trace("[{}] Failed to process the request: {}", new Object[]{requestId, request, e});
                                }
                            }, this.requestTimeout, this.timeoutExecutor, this.callbackExecutor);
                        }
                        catch (Throwable e2) {
                            this.pendingRequestCount.decrementAndGet();
                            log.warn("[{}] Failed to process the request: {}", new Object[]{requestId, request, e2});
                        }
                    });
                }
                catch (InterruptException ie) {
                    if (this.stopped) continue;
                    log.warn("Fetching data from kafka was interrupted.", (Throwable)ie);
                }
                catch (Throwable e) {
                    log.warn("Failed to obtain messages from queue.", e);
                    try {
                        Thread.sleep(this.pollInterval);
                    }
                    catch (InterruptedException e2) {
                        log.trace("Failed to wait until the server has capacity to handle new requests", (Throwable)e2);
                    }
                }
            }
        });
    }

    public void stop() {
        this.stopped = true;
        if (this.timeoutExecutor != null) {
            this.timeoutExecutor.shutdownNow();
        }
        if (this.loopExecutor != null) {
            this.loopExecutor.shutdownNow();
        }
    }

    private void reply(UUID requestId, String topic, Response response) {
        this.responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader("requestId", this.uuidToBytes(requestId))), null);
    }

    public static <Request, Response> TbKafkaResponseTemplateBuilder<Request, Response> builder() {
        return new TbKafkaResponseTemplateBuilder();
    }

    public static class TbKafkaResponseTemplateBuilder<Request, Response> {
        private TBKafkaConsumerTemplate<Request> requestTemplate;
        private TBKafkaProducerTemplate<Response> responseTemplate;
        private TbKafkaHandler<Request, Response> handler;
        private long pollInterval;
        private long requestTimeout;
        private int maxPendingRequests;
        private ExecutorService executor;

        TbKafkaResponseTemplateBuilder() {
        }

        public TbKafkaResponseTemplateBuilder<Request, Response> requestTemplate(TBKafkaConsumerTemplate<Request> requestTemplate) {
            this.requestTemplate = requestTemplate;
            return this;
        }

        public TbKafkaResponseTemplateBuilder<Request, Response> responseTemplate(TBKafkaProducerTemplate<Response> responseTemplate) {
            this.responseTemplate = responseTemplate;
            return this;
        }

        public TbKafkaResponseTemplateBuilder<Request, Response> handler(TbKafkaHandler<Request, Response> handler) {
            this.handler = handler;
            return this;
        }

        public TbKafkaResponseTemplateBuilder<Request, Response> pollInterval(long pollInterval) {
            this.pollInterval = pollInterval;
            return this;
        }

        public TbKafkaResponseTemplateBuilder<Request, Response> requestTimeout(long requestTimeout) {
            this.requestTimeout = requestTimeout;
            return this;
        }

        public TbKafkaResponseTemplateBuilder<Request, Response> maxPendingRequests(int maxPendingRequests) {
            this.maxPendingRequests = maxPendingRequests;
            return this;
        }

        public TbKafkaResponseTemplateBuilder<Request, Response> executor(ExecutorService executor) {
            this.executor = executor;
            return this;
        }

        public TbKafkaResponseTemplate<Request, Response> build() {
            return new TbKafkaResponseTemplate<Request, Response>(this.requestTemplate, this.responseTemplate, this.handler, this.pollInterval, this.requestTimeout, this.maxPendingRequests, this.executor);
        }

        public String toString() {
            return "TbKafkaResponseTemplate.TbKafkaResponseTemplateBuilder(requestTemplate=" + this.requestTemplate + ", responseTemplate=" + this.responseTemplate + ", handler=" + this.handler + ", pollInterval=" + this.pollInterval + ", requestTimeout=" + this.requestTimeout + ", maxPendingRequests=" + this.maxPendingRequests + ", executor=" + this.executor + ")";
        }
    }
}

