package org.thingsboard.server.kafka;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/thingsboard/server/kafka/TbKafkaRequestTemplate.class */
public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTemplate {
    private static final Logger log = LoggerFactory.getLogger(TbKafkaRequestTemplate.class);
    private final TBKafkaProducerTemplate<Request> requestTemplate;
    private final TBKafkaConsumerTemplate<Response> responseTemplate;
    private final boolean internalExecutor;
    private final ExecutorService executor;
    private final long maxRequestTimeout;
    private final long maxPendingRequests;
    private final long pollInterval;
    private volatile long tickTs = 0;
    private volatile long tickSize = 0;
    private volatile boolean stopped = false;
    private final ConcurrentMap<UUID, ResponseMetaData<Response>> pendingRequests = new ConcurrentHashMap();

    /* loaded from: input_file:org/thingsboard/server/kafka/TbKafkaRequestTemplate$ResponseMetaData.class */
    private static class ResponseMetaData<T> {
        private final long expTime;
        private final SettableFuture<T> future;

        ResponseMetaData(long j, SettableFuture<T> settableFuture) {
            this.expTime = j;
            this.future = settableFuture;
        }
    }

    /* loaded from: input_file:org/thingsboard/server/kafka/TbKafkaRequestTemplate$TbKafkaRequestTemplateBuilder.class */
    public static class TbKafkaRequestTemplateBuilder<Request, Response> {
        private TBKafkaProducerTemplate<Request> requestTemplate;
        private TBKafkaConsumerTemplate<Response> responseTemplate;
        private long maxRequestTimeout;
        private long maxPendingRequests;
        private long pollInterval;
        private ExecutorService executor;

        TbKafkaRequestTemplateBuilder() {
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> requestTemplate(TBKafkaProducerTemplate<Request> tBKafkaProducerTemplate) {
            this.requestTemplate = tBKafkaProducerTemplate;
            return this;
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> responseTemplate(TBKafkaConsumerTemplate<Response> tBKafkaConsumerTemplate) {
            this.responseTemplate = tBKafkaConsumerTemplate;
            return this;
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> maxRequestTimeout(long j) {
            this.maxRequestTimeout = j;
            return this;
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> maxPendingRequests(long j) {
            this.maxPendingRequests = j;
            return this;
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> pollInterval(long j) {
            this.pollInterval = j;
            return this;
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> executor(ExecutorService executorService) {
            this.executor = executorService;
            return this;
        }

        public TbKafkaRequestTemplate<Request, Response> build() {
            return new TbKafkaRequestTemplate<>(this.requestTemplate, this.responseTemplate, this.maxRequestTimeout, this.maxPendingRequests, this.pollInterval, this.executor);
        }

        public String toString() {
            return "TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder(requestTemplate=" + this.requestTemplate + ", responseTemplate=" + this.responseTemplate + ", maxRequestTimeout=" + this.maxRequestTimeout + ", maxPendingRequests=" + this.maxPendingRequests + ", pollInterval=" + this.pollInterval + ", executor=" + this.executor + ")";
        }
    }

    public TbKafkaRequestTemplate(TBKafkaProducerTemplate<Request> tBKafkaProducerTemplate, TBKafkaConsumerTemplate<Response> tBKafkaConsumerTemplate, long j, long j2, long j3, ExecutorService executorService) {
        this.requestTemplate = tBKafkaProducerTemplate;
        this.responseTemplate = tBKafkaConsumerTemplate;
        this.maxRequestTimeout = j;
        this.maxPendingRequests = j2;
        this.pollInterval = j3;
        if (executorService != null) {
            this.internalExecutor = false;
            this.executor = executorService;
        } else {
            this.internalExecutor = true;
            this.executor = Executors.newSingleThreadExecutor();
        }
    }

    public void init() {
        try {
            new TBKafkaAdmin(this.requestTemplate.getSettings()).createTopic(new NewTopic(this.responseTemplate.getTopic(), 1, (short) 1)).all().get();
        } catch (Exception e) {
            if (!(e instanceof TopicExistsException) && (e.getCause() == null || !(e.getCause() instanceof TopicExistsException))) {
                log.info("[{}] Failed to create topic: {}", new Object[]{this.responseTemplate.getTopic(), e.getMessage(), e});
                throw new RuntimeException(e);
            }
            log.trace("[{}] Topic already exists. ", this.responseTemplate.getTopic());
        }
        this.requestTemplate.init();
        this.tickTs = System.currentTimeMillis();
        this.responseTemplate.subscribe();
        this.executor.submit(() -> {
            long j = 0;
            while (!this.stopped) {
                try {
                    ConsumerRecords<String, byte[]> poll = this.responseTemplate.poll(Duration.ofMillis(this.pollInterval));
                    if (poll.count() > 0) {
                        log.trace("Polling responses completed, consumer records count [{}]", Integer.valueOf(poll.count()));
                    }
                    poll.forEach(consumerRecord -> {
                        log.trace("Received response to Kafka Template request: {}", consumerRecord);
                        Header lastHeader = consumerRecord.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
                        Response response = null;
                        UUID uuid = null;
                        if (lastHeader == null) {
                            try {
                                response = this.responseTemplate.decode(consumerRecord);
                                uuid = this.responseTemplate.extractRequestId(response);
                            } catch (IOException e2) {
                                log.error("Failed to decode response", e2);
                            }
                        } else {
                            uuid = bytesToUuid(lastHeader.value());
                        }
                        if (uuid == null) {
                            log.error("[{}] Missing requestId in header and body", consumerRecord);
                            return;
                        }
                        log.trace("[{}] Response received", uuid);
                        ResponseMetaData<Response> remove = this.pendingRequests.remove(uuid);
                        if (remove == null) {
                            log.trace("[{}] Invalid or stale request", uuid);
                            return;
                        }
                        if (response == null) {
                            try {
                                response = this.responseTemplate.decode(consumerRecord);
                            } catch (IOException e3) {
                                ((ResponseMetaData) remove).future.setException(e3);
                                return;
                            }
                        }
                        ((ResponseMetaData) remove).future.set(response);
                    });
                    this.tickTs = System.currentTimeMillis();
                    this.tickSize = this.pendingRequests.size();
                    if (j < this.tickTs) {
                        this.pendingRequests.entrySet().forEach(entry -> {
                            ResponseMetaData<Response> remove;
                            if (((ResponseMetaData) entry.getValue()).expTime >= this.tickTs || (remove = this.pendingRequests.remove(entry.getKey())) == null) {
                                return;
                            }
                            log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", new Object[]{entry.getKey(), Long.valueOf(((ResponseMetaData) remove).expTime), Long.valueOf(this.tickTs)});
                            ((ResponseMetaData) remove).future.setException(new TimeoutException());
                        });
                        j = this.tickTs + this.maxRequestTimeout;
                    }
                } catch (InterruptException e2) {
                    if (!this.stopped) {
                        log.warn("Fetching data from kafka was interrupted.", e2);
                    }
                } catch (Throwable th) {
                    log.warn("Failed to obtain responses from queue.", th);
                    try {
                        Thread.sleep(this.pollInterval);
                    } catch (InterruptedException e3) {
                        log.trace("Failed to wait until the server has capacity to handle new responses", e3);
                    }
                }
            }
        });
    }

    public void stop() {
        this.stopped = true;
        if (this.internalExecutor) {
            this.executor.shutdownNow();
        }
    }

    public ListenableFuture<Response> post(String str, Request request) {
        if (this.tickSize > this.maxPendingRequests) {
            return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
        }
        UUID randomUUID = UUID.randomUUID();
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(randomUUID)));
        arrayList.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(this.responseTemplate.getTopic())));
        SettableFuture create = SettableFuture.create();
        ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(this.tickTs + this.maxRequestTimeout, create);
        this.pendingRequests.putIfAbsent(randomUUID, responseMetaData);
        Request enrich = this.requestTemplate.enrich(request, this.responseTemplate.getTopic(), randomUUID);
        log.trace("[{}] Sending request, key [{}], expTime [{}]", new Object[]{randomUUID, str, Long.valueOf(((ResponseMetaData) responseMetaData).expTime)});
        this.requestTemplate.send(str, (String) enrich, (Iterable<Header>) arrayList, (recordMetadata, exc) -> {
            if (exc != null) {
                log.trace("[{}] Failed to post the request", randomUUID, exc);
            } else {
                log.trace("[{}] Posted the request", randomUUID, recordMetadata);
            }
        });
        return create;
    }

    public static <Request, Response> TbKafkaRequestTemplateBuilder<Request, Response> builder() {
        return new TbKafkaRequestTemplateBuilder<>();
    }
}
