/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.common;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import jakarta.annotation.Nullable;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.TbStopWatch;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.MessagesStats;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.AbstractTbQueueTemplate;

public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg>
extends AbstractTbQueueTemplate
implements TbQueueRequestTemplate<Request, Response> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultTbQueueRequestTemplate.class);
    private final TbQueueAdmin queueAdmin;
    private final TbQueueProducer<Request> requestTemplate;
    private final TbQueueConsumer<Response> responseTemplate;
    final ConcurrentHashMap<UUID, ResponseMetaData<Response>> pendingRequests = new ConcurrentHashMap();
    final boolean internalExecutor;
    final ExecutorService executor;
    final long maxRequestTimeoutNs;
    final long maxRequestTimeout;
    final long maxPendingRequests;
    final long pollInterval;
    volatile boolean stopped = false;
    long nextCleanupNs = 0L;
    private final Lock cleanerLock = new ReentrantLock();
    private MessagesStats messagesStats;

    public DefaultTbQueueRequestTemplate(TbQueueAdmin queueAdmin, TbQueueProducer<Request> requestTemplate, TbQueueConsumer<Response> responseTemplate, long maxRequestTimeout, long maxPendingRequests, long pollInterval, @Nullable ExecutorService executor) {
        this.queueAdmin = queueAdmin;
        this.requestTemplate = requestTemplate;
        this.responseTemplate = responseTemplate;
        this.maxRequestTimeoutNs = TimeUnit.MILLISECONDS.toNanos(maxRequestTimeout);
        this.maxRequestTimeout = maxRequestTimeout;
        this.maxPendingRequests = maxPendingRequests;
        this.pollInterval = pollInterval;
        this.internalExecutor = executor == null;
        this.executor = this.internalExecutor ? this.createExecutor() : executor;
    }

    ExecutorService createExecutor() {
        return Executors.newSingleThreadExecutor((ThreadFactory)ThingsBoardThreadFactory.forName((String)("tb-queue-request-template-" + this.responseTemplate.getTopic())));
    }

    public void init() {
        this.queueAdmin.createTopicIfNotExists(this.responseTemplate.getTopic());
        this.responseTemplate.subscribe();
        this.executor.submit(this::mainLoop);
    }

    void mainLoop() {
        while (!this.stopped) {
            TbStopWatch sw = TbStopWatch.create();
            try {
                this.fetchAndProcessResponses();
            }
            catch (Throwable e) {
                long sleepNanos = TimeUnit.MILLISECONDS.toNanos(this.pollInterval) - sw.stopAndGetTotalTimeNanos();
                log.warn("Failed to obtain and process responses from queue. Going to sleep " + sleepNanos + "ns", e);
                this.sleep(sleepNanos);
            }
        }
    }

    void fetchAndProcessResponses() {
        long pendingRequestsCount = this.pendingRequests.mappingCount();
        log.trace("Starting template pool topic {}, for pendingRequests {}", (Object)this.responseTemplate.getTopic(), (Object)pendingRequestsCount);
        List<Response> responses = this.doPoll();
        log.trace("Completed template poll topic {}, for pendingRequests [{}], received [{}] responses", new Object[]{this.responseTemplate.getTopic(), pendingRequestsCount, responses.size()});
        responses.forEach(this::processResponse);
        this.responseTemplate.commit();
        this.tryCleanStaleRequests();
    }

    private boolean tryCleanStaleRequests() {
        if (!this.cleanerLock.tryLock()) {
            return false;
        }
        try {
            log.trace("tryCleanStaleRequest...");
            long currentNs = this.getCurrentClockNs();
            if (this.nextCleanupNs < currentNs) {
                this.pendingRequests.forEach((key, value) -> {
                    ResponseMetaData<Response> staleRequest;
                    if (value.expTime < currentNs && (staleRequest = this.pendingRequests.remove(key)) != null) {
                        this.setTimeoutException((UUID)key, staleRequest, currentNs);
                    }
                });
                this.setupNextCleanup();
            }
        }
        finally {
            this.cleanerLock.unlock();
        }
        return true;
    }

    void setupNextCleanup() {
        this.nextCleanupNs = this.getCurrentClockNs() + this.maxRequestTimeoutNs;
        log.trace("setupNextCleanup {}", (Object)this.nextCleanupNs);
    }

    List<Response> doPoll() {
        return this.responseTemplate.poll(this.pollInterval);
    }

    void sleep(long nanos) {
        LockSupport.parkNanos(nanos);
    }

    void setTimeoutException(UUID key, ResponseMetaData<Response> staleRequest, long currentNs) {
        if (currentNs >= staleRequest.getSubmitTime() + staleRequest.getTimeout()) {
            log.debug("Request timeout detected, currentNs [{}], {}, key [{}]", new Object[]{currentNs, staleRequest, key});
        } else {
            log.info("Request timeout detected, currentNs [{}], {}, key [{}]", new Object[]{currentNs, staleRequest, key});
        }
        staleRequest.future.setException((Throwable)new TimeoutException());
    }

    void processResponse(Response response) {
        byte[] requestIdHeader = response.getHeaders().get("requestId");
        if (requestIdHeader == null) {
            log.error("[{}] Missing requestId in header and body", response);
        } else {
            UUID requestId = DefaultTbQueueRequestTemplate.bytesToUuid(requestIdHeader);
            log.trace("[{}] Response received: {}", (Object)requestId, response);
            ResponseMetaData<Response> expectedResponse = this.pendingRequests.remove(requestId);
            if (expectedResponse == null) {
                log.debug("[{}] Invalid or stale request, response: {}", (Object)requestId, (Object)String.valueOf(response).replace("\n", " "));
            } else {
                expectedResponse.future.set(response);
            }
        }
    }

    public void stop() {
        this.stopped = true;
        if (this.responseTemplate != null) {
            this.responseTemplate.unsubscribe();
        }
        if (this.requestTemplate != null) {
            this.requestTemplate.stop();
        }
        if (this.internalExecutor) {
            this.executor.shutdownNow();
        }
    }

    public void setMessagesStats(MessagesStats messagesStats) {
        this.messagesStats = messagesStats;
    }

    public ListenableFuture<Response> send(Request request) {
        return this.send(request, this.maxRequestTimeoutNs);
    }

    public ListenableFuture<Response> send(Request request, long requestTimeoutNs) {
        return this.send(request, requestTimeoutNs, null);
    }

    public ListenableFuture<Response> send(Request request, Integer partition) {
        return this.send(request, this.maxRequestTimeoutNs, partition);
    }

    private ListenableFuture<Response> send(Request request, long requestTimeoutNs, Integer partition) {
        if (this.pendingRequests.mappingCount() >= this.maxPendingRequests) {
            log.warn("Pending request map is full [{}]! Consider to increase maxPendingRequests or increase processing performance. Request is {}", (Object)this.maxPendingRequests, request);
            return Futures.immediateFailedFuture((Throwable)new RuntimeException("Pending request map is full!"));
        }
        UUID requestId = UUID.randomUUID();
        request.getHeaders().put("requestId", DefaultTbQueueRequestTemplate.uuidToBytes(requestId));
        request.getHeaders().put("responseTopic", DefaultTbQueueRequestTemplate.stringToBytes(this.responseTemplate.getTopic()));
        request.getHeaders().put("expireTs", DefaultTbQueueRequestTemplate.longToBytes(this.getCurrentTimeMs() + this.maxRequestTimeout));
        long currentClockNs = this.getCurrentClockNs();
        SettableFuture future = SettableFuture.create();
        ResponseMetaData responseMetaData = new ResponseMetaData(currentClockNs + requestTimeoutNs, future, currentClockNs, requestTimeoutNs);
        log.trace("pending {}", responseMetaData);
        if (this.pendingRequests.putIfAbsent(requestId, responseMetaData) != null) {
            log.warn("Pending request already exists [{}]!", (Object)this.maxPendingRequests);
            return Futures.immediateFailedFuture((Throwable)new RuntimeException("Pending request already exists !" + String.valueOf(requestId)));
        }
        this.sendToRequestTemplate(request, requestId, partition, future, responseMetaData);
        return future;
    }

    long getCurrentClockNs() {
        return System.nanoTime();
    }

    long getCurrentTimeMs() {
        return System.currentTimeMillis();
    }

    void sendToRequestTemplate(Request request, final UUID requestId, Integer partition, SettableFuture<Response> future, ResponseMetaData<Response> responseMetaData) {
        log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", new Object[]{requestId, request.getKey(), responseMetaData.expTime, request});
        if (this.messagesStats != null) {
            this.messagesStats.incrementTotal();
        }
        TopicPartitionInfo tpi = TopicPartitionInfo.builder().topic(this.requestTemplate.getDefaultTopic()).partition(partition).build();
        this.requestTemplate.send(tpi, request, new TbQueueCallback(){
            final /* synthetic */ TbQueueMsg val$request;
            final /* synthetic */ SettableFuture val$future;
            {
                this.val$request = tbQueueMsg;
                this.val$future = settableFuture;
            }

            public void onSuccess(TbQueueMsgMetadata metadata) {
                if (DefaultTbQueueRequestTemplate.this.messagesStats != null) {
                    DefaultTbQueueRequestTemplate.this.messagesStats.incrementSuccessful();
                }
                log.trace("[{}] Request sent: {}, request {}", new Object[]{requestId, metadata, this.val$request});
            }

            public void onFailure(Throwable t) {
                if (DefaultTbQueueRequestTemplate.this.messagesStats != null) {
                    DefaultTbQueueRequestTemplate.this.messagesStats.incrementFailed();
                }
                DefaultTbQueueRequestTemplate.this.pendingRequests.remove(requestId);
                this.val$future.setException(t);
            }
        });
    }

    @Generated
    public static <Request extends TbQueueMsg, Response extends TbQueueMsg> DefaultTbQueueRequestTemplateBuilder<Request, Response> builder() {
        return new DefaultTbQueueRequestTemplateBuilder();
    }

    static class ResponseMetaData<T> {
        private final long submitTime;
        private final long timeout;
        private final long expTime;
        private final SettableFuture<T> future;

        ResponseMetaData(long ts, SettableFuture<T> future, long submitTime, long timeout) {
            this.submitTime = submitTime;
            this.timeout = timeout;
            this.expTime = ts;
            this.future = future;
        }

        public String toString() {
            return "ResponseMetaData{submitTime=" + this.submitTime + ", calculatedExpTime=" + (this.submitTime + this.timeout) + ", deltaMs=" + (this.expTime - this.submitTime) + ", expTime=" + this.expTime + ", future=" + String.valueOf(this.future) + "}";
        }

        @Generated
        public long getSubmitTime() {
            return this.submitTime;
        }

        @Generated
        public long getTimeout() {
            return this.timeout;
        }

        @Generated
        public long getExpTime() {
            return this.expTime;
        }

        @Generated
        public SettableFuture<T> getFuture() {
            return this.future;
        }
    }

    @Generated
    public static class DefaultTbQueueRequestTemplateBuilder<Request extends TbQueueMsg, Response extends TbQueueMsg> {
        @Generated
        private TbQueueAdmin queueAdmin;
        @Generated
        private TbQueueProducer<Request> requestTemplate;
        @Generated
        private TbQueueConsumer<Response> responseTemplate;
        @Generated
        private long maxRequestTimeout;
        @Generated
        private long maxPendingRequests;
        @Generated
        private long pollInterval;
        @Generated
        private ExecutorService executor;

        @Generated
        DefaultTbQueueRequestTemplateBuilder() {
        }

        @Generated
        public DefaultTbQueueRequestTemplateBuilder<Request, Response> queueAdmin(TbQueueAdmin queueAdmin) {
            this.queueAdmin = queueAdmin;
            return this;
        }

        @Generated
        public DefaultTbQueueRequestTemplateBuilder<Request, Response> requestTemplate(TbQueueProducer<Request> requestTemplate) {
            this.requestTemplate = requestTemplate;
            return this;
        }

        @Generated
        public DefaultTbQueueRequestTemplateBuilder<Request, Response> responseTemplate(TbQueueConsumer<Response> responseTemplate) {
            this.responseTemplate = responseTemplate;
            return this;
        }

        @Generated
        public DefaultTbQueueRequestTemplateBuilder<Request, Response> maxRequestTimeout(long maxRequestTimeout) {
            this.maxRequestTimeout = maxRequestTimeout;
            return this;
        }

        @Generated
        public DefaultTbQueueRequestTemplateBuilder<Request, Response> maxPendingRequests(long maxPendingRequests) {
            this.maxPendingRequests = maxPendingRequests;
            return this;
        }

        @Generated
        public DefaultTbQueueRequestTemplateBuilder<Request, Response> pollInterval(long pollInterval) {
            this.pollInterval = pollInterval;
            return this;
        }

        @Generated
        public DefaultTbQueueRequestTemplateBuilder<Request, Response> executor(@Nullable ExecutorService executor) {
            this.executor = executor;
            return this;
        }

        @Generated
        public DefaultTbQueueRequestTemplate<Request, Response> build() {
            return new DefaultTbQueueRequestTemplate<Request, Response>(this.queueAdmin, this.requestTemplate, this.responseTemplate, this.maxRequestTimeout, this.maxPendingRequests, this.pollInterval, this.executor);
        }

        @Generated
        public String toString() {
            return "DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder(queueAdmin=" + String.valueOf(this.queueAdmin) + ", requestTemplate=" + String.valueOf(this.requestTemplate) + ", responseTemplate=" + String.valueOf(this.responseTemplate) + ", maxRequestTimeout=" + this.maxRequestTimeout + ", maxPendingRequests=" + this.maxPendingRequests + ", pollInterval=" + this.pollInterval + ", executor=" + String.valueOf(this.executor) + ")";
        }
    }
}

