/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.kafka;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.kafka.AbstractTbKafkaTemplate;
import org.thingsboard.server.kafka.TBKafkaAdmin;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;

public class TbKafkaRequestTemplate<Request, Response>
extends AbstractTbKafkaTemplate {
    private static final Logger log = LoggerFactory.getLogger(TbKafkaRequestTemplate.class);
    private final TBKafkaProducerTemplate<Request> requestTemplate;
    private final TBKafkaConsumerTemplate<Response> responseTemplate;
    private final ConcurrentMap<UUID, ResponseMetaData<Response>> pendingRequests;
    private final boolean internalExecutor;
    private final ExecutorService executor;
    private final long maxRequestTimeout;
    private final long maxPendingRequests;
    private final long pollInterval;
    private volatile long tickTs = 0L;
    private volatile long tickSize = 0L;
    private volatile boolean stopped = false;

    public TbKafkaRequestTemplate(TBKafkaProducerTemplate<Request> requestTemplate, TBKafkaConsumerTemplate<Response> responseTemplate, long maxRequestTimeout, long maxPendingRequests, long pollInterval, ExecutorService executor) {
        this.requestTemplate = requestTemplate;
        this.responseTemplate = responseTemplate;
        this.pendingRequests = new ConcurrentHashMap<UUID, ResponseMetaData<Response>>();
        this.maxRequestTimeout = maxRequestTimeout;
        this.maxPendingRequests = maxPendingRequests;
        this.pollInterval = pollInterval;
        if (executor != null) {
            this.internalExecutor = false;
            this.executor = executor;
        } else {
            this.internalExecutor = true;
            this.executor = Executors.newSingleThreadExecutor();
        }
    }

    public void init() {
        try {
            TBKafkaAdmin admin = new TBKafkaAdmin(this.requestTemplate.getSettings());
            CreateTopicsResult result = admin.createTopic(new NewTopic(this.responseTemplate.getTopic(), 1, 1));
            result.all().get();
        }
        catch (Exception e) {
            if (e instanceof TopicExistsException || e.getCause() != null && e.getCause() instanceof TopicExistsException) {
                log.trace("[{}] Topic already exists. ", (Object)this.responseTemplate.getTopic());
            }
            log.info("[{}] Failed to create topic: {}", new Object[]{this.responseTemplate.getTopic(), e.getMessage(), e});
            throw new RuntimeException(e);
        }
        this.requestTemplate.init();
        this.tickTs = System.currentTimeMillis();
        this.responseTemplate.subscribe();
        this.executor.submit(() -> {
            long nextCleanupMs = 0L;
            while (!this.stopped) {
                try {
                    ConsumerRecords<String, byte[]> responses = this.responseTemplate.poll(Duration.ofMillis(this.pollInterval));
                    if (responses.count() > 0) {
                        log.trace("Polling responses completed, consumer records count [{}]", (Object)responses.count());
                    }
                    responses.forEach(response -> {
                        log.trace("Received response to Kafka Template request: {}", response);
                        Header requestIdHeader = response.headers().lastHeader("requestId");
                        Object decodedResponse = null;
                        UUID requestId = null;
                        if (requestIdHeader == null) {
                            try {
                                decodedResponse = this.responseTemplate.decode((ConsumerRecord<String, byte[]>)response);
                                requestId = this.responseTemplate.extractRequestId(decodedResponse);
                            }
                            catch (IOException e) {
                                log.error("Failed to decode response", (Throwable)e);
                            }
                        } else {
                            requestId = TbKafkaRequestTemplate.bytesToUuid(requestIdHeader.value());
                        }
                        if (requestId == null) {
                            log.error("[{}] Missing requestId in header and body", response);
                        } else {
                            log.trace("[{}] Response received", (Object)requestId);
                            ResponseMetaData expectedResponse = (ResponseMetaData)this.pendingRequests.remove(requestId);
                            if (expectedResponse == null) {
                                log.trace("[{}] Invalid or stale request", (Object)requestId);
                            } else {
                                try {
                                    if (decodedResponse == null) {
                                        decodedResponse = this.responseTemplate.decode((ConsumerRecord<String, byte[]>)response);
                                    }
                                    expectedResponse.future.set(decodedResponse);
                                }
                                catch (IOException e) {
                                    expectedResponse.future.setException((Throwable)e);
                                }
                            }
                        }
                    });
                    this.tickTs = System.currentTimeMillis();
                    this.tickSize = this.pendingRequests.size();
                    if (nextCleanupMs >= this.tickTs) continue;
                    this.pendingRequests.forEach((key, value) -> {
                        ResponseMetaData staleRequest;
                        if (((ResponseMetaData)value).expTime < this.tickTs && (staleRequest = (ResponseMetaData)this.pendingRequests.remove(key)) != null) {
                            log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", new Object[]{key, staleRequest.expTime, this.tickTs});
                            staleRequest.future.setException((Throwable)new TimeoutException());
                        }
                    });
                    nextCleanupMs = this.tickTs + this.maxRequestTimeout;
                }
                catch (InterruptException ie) {
                    if (this.stopped) continue;
                    log.warn("Fetching data from kafka was interrupted.", (Throwable)ie);
                }
                catch (Throwable e) {
                    log.warn("Failed to obtain responses from queue.", e);
                    try {
                        Thread.sleep(this.pollInterval);
                    }
                    catch (InterruptedException e2) {
                        log.trace("Failed to wait until the server has capacity to handle new responses", (Throwable)e2);
                    }
                }
            }
        });
    }

    public void stop() {
        this.stopped = true;
        if (this.internalExecutor) {
            this.executor.shutdownNow();
        }
    }

    public ListenableFuture<Response> post(String key, Request request) {
        if (this.tickSize > this.maxPendingRequests) {
            return Futures.immediateFailedFuture((Throwable)new RuntimeException("Pending request map is full!"));
        }
        UUID requestId = UUID.randomUUID();
        ArrayList<Header> headers = new ArrayList<Header>(2);
        headers.add((Header)new RecordHeader("requestId", this.uuidToBytes(requestId)));
        headers.add((Header)new RecordHeader("responseTopic", this.stringToBytes(this.responseTemplate.getTopic())));
        SettableFuture future = SettableFuture.create();
        ResponseMetaData responseMetaData = new ResponseMetaData(this.tickTs + this.maxRequestTimeout, future);
        this.pendingRequests.putIfAbsent(requestId, responseMetaData);
        log.trace("[{}] Sending request, key [{}], expTime [{}]", new Object[]{requestId, key, responseMetaData.expTime});
        this.requestTemplate.send(key, request, headers, (metadata, exception) -> {
            if (exception != null) {
                log.trace("[{}] Failed to post the request", (Object)requestId, (Object)exception);
            } else {
                log.trace("[{}] Posted the request: {}", (Object)requestId, (Object)metadata);
            }
        });
        return future;
    }

    public static <Request, Response> TbKafkaRequestTemplateBuilder<Request, Response> builder() {
        return new TbKafkaRequestTemplateBuilder();
    }

    public static class TbKafkaRequestTemplateBuilder<Request, Response> {
        private TBKafkaProducerTemplate<Request> requestTemplate;
        private TBKafkaConsumerTemplate<Response> responseTemplate;
        private long maxRequestTimeout;
        private long maxPendingRequests;
        private long pollInterval;
        private ExecutorService executor;

        TbKafkaRequestTemplateBuilder() {
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> requestTemplate(TBKafkaProducerTemplate<Request> requestTemplate) {
            this.requestTemplate = requestTemplate;
            return this;
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> responseTemplate(TBKafkaConsumerTemplate<Response> responseTemplate) {
            this.responseTemplate = responseTemplate;
            return this;
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> maxRequestTimeout(long maxRequestTimeout) {
            this.maxRequestTimeout = maxRequestTimeout;
            return this;
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> maxPendingRequests(long maxPendingRequests) {
            this.maxPendingRequests = maxPendingRequests;
            return this;
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> pollInterval(long pollInterval) {
            this.pollInterval = pollInterval;
            return this;
        }

        public TbKafkaRequestTemplateBuilder<Request, Response> executor(ExecutorService executor) {
            this.executor = executor;
            return this;
        }

        public TbKafkaRequestTemplate<Request, Response> build() {
            return new TbKafkaRequestTemplate<Request, Response>(this.requestTemplate, this.responseTemplate, this.maxRequestTimeout, this.maxPendingRequests, this.pollInterval, this.executor);
        }

        public String toString() {
            return "TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder(requestTemplate=" + this.requestTemplate + ", responseTemplate=" + this.responseTemplate + ", maxRequestTimeout=" + this.maxRequestTimeout + ", maxPendingRequests=" + this.maxPendingRequests + ", pollInterval=" + this.pollInterval + ", executor=" + this.executor + ")";
        }
    }

    private static class ResponseMetaData<T> {
        private final long expTime;
        private final SettableFuture<T> future;

        ResponseMetaData(long ts, SettableFuture<T> future) {
            this.expTime = ts;
            this.future = future;
        }
    }
}

