package org.thingsboard.server.queue.common;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.TbStopWatch;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.MessagesStats;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;

/* loaded from: input_file:org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.class */
public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> extends AbstractTbQueueTemplate implements TbQueueRequestTemplate<Request, Response> {
    private static final Logger log = LoggerFactory.getLogger(DefaultTbQueueRequestTemplate.class);
    private final TbQueueAdmin queueAdmin;
    private final TbQueueProducer<Request> requestTemplate;
    private final TbQueueConsumer<Response> responseTemplate;
    final boolean internalExecutor;
    final ExecutorService executor;
    final long maxRequestTimeoutNs;
    final long maxRequestTimeout;
    final long maxPendingRequests;
    final long pollInterval;
    private MessagesStats messagesStats;
    final ConcurrentHashMap<UUID, ResponseMetaData<Response>> pendingRequests = new ConcurrentHashMap<>();
    volatile boolean stopped = false;
    long nextCleanupNs = 0;
    private final Lock cleanerLock = new ReentrantLock();

    /* loaded from: input_file:org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate$DefaultTbQueueRequestTemplateBuilder.class */
    public static class DefaultTbQueueRequestTemplateBuilder<Request extends TbQueueMsg, Response extends TbQueueMsg> {
        private TbQueueAdmin queueAdmin;
        private TbQueueProducer<Request> requestTemplate;
        private TbQueueConsumer<Response> responseTemplate;
        private long maxRequestTimeout;
        private long maxPendingRequests;
        private long pollInterval;
        private ExecutorService executor;

        DefaultTbQueueRequestTemplateBuilder() {
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> queueAdmin(TbQueueAdmin tbQueueAdmin) {
            this.queueAdmin = tbQueueAdmin;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> requestTemplate(TbQueueProducer<Request> tbQueueProducer) {
            this.requestTemplate = tbQueueProducer;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> responseTemplate(TbQueueConsumer<Response> tbQueueConsumer) {
            this.responseTemplate = tbQueueConsumer;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> maxRequestTimeout(long j) {
            this.maxRequestTimeout = j;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> maxPendingRequests(long j) {
            this.maxPendingRequests = j;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> pollInterval(long j) {
            this.pollInterval = j;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> executor(@Nullable ExecutorService executorService) {
            this.executor = executorService;
            return this;
        }

        public DefaultTbQueueRequestTemplate<Request, Response> build() {
            return new DefaultTbQueueRequestTemplate<>(this.queueAdmin, this.requestTemplate, this.responseTemplate, this.maxRequestTimeout, this.maxPendingRequests, this.pollInterval, this.executor);
        }

        public String toString() {
            TbQueueAdmin tbQueueAdmin = this.queueAdmin;
            TbQueueProducer<Request> tbQueueProducer = this.requestTemplate;
            TbQueueConsumer<Response> tbQueueConsumer = this.responseTemplate;
            long j = this.maxRequestTimeout;
            long j2 = this.maxPendingRequests;
            long j3 = this.pollInterval;
            ExecutorService executorService = this.executor;
            return "DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder(queueAdmin=" + tbQueueAdmin + ", requestTemplate=" + tbQueueProducer + ", responseTemplate=" + tbQueueConsumer + ", maxRequestTimeout=" + j + ", maxPendingRequests=" + tbQueueAdmin + ", pollInterval=" + j2 + ", executor=" + tbQueueAdmin + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate$ResponseMetaData.class */
    public static class ResponseMetaData<T> {
        private final long submitTime;
        private final long timeout;
        private final long expTime;
        private final SettableFuture<T> future;

        ResponseMetaData(long j, SettableFuture<T> settableFuture, long j2, long j3) {
            this.submitTime = j2;
            this.timeout = j3;
            this.expTime = j;
            this.future = settableFuture;
        }

        public String toString() {
            long j = this.submitTime;
            long j2 = this.submitTime + this.timeout;
            long j3 = this.expTime - this.submitTime;
            long j4 = this.expTime;
            SettableFuture<T> settableFuture = this.future;
            return "ResponseMetaData{submitTime=" + j + ", calculatedExpTime=" + j + ", deltaMs=" + j2 + ", expTime=" + j + ", future=" + j3 + "}";
        }

        public long getSubmitTime() {
            return this.submitTime;
        }

        public long getTimeout() {
            return this.timeout;
        }

        public long getExpTime() {
            return this.expTime;
        }

        public SettableFuture<T> getFuture() {
            return this.future;
        }
    }

    public DefaultTbQueueRequestTemplate(TbQueueAdmin tbQueueAdmin, TbQueueProducer<Request> tbQueueProducer, TbQueueConsumer<Response> tbQueueConsumer, long j, long j2, long j3, @Nullable ExecutorService executorService) {
        this.queueAdmin = tbQueueAdmin;
        this.requestTemplate = tbQueueProducer;
        this.responseTemplate = tbQueueConsumer;
        this.maxRequestTimeoutNs = TimeUnit.MILLISECONDS.toNanos(j);
        this.maxRequestTimeout = j;
        this.maxPendingRequests = j2;
        this.pollInterval = j3;
        this.internalExecutor = executorService == null;
        this.executor = this.internalExecutor ? createExecutor() : executorService;
    }

    ExecutorService createExecutor() {
        return Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-queue-request-template-" + this.responseTemplate.getTopic()));
    }

    public void init() {
        this.queueAdmin.createTopicIfNotExists(this.responseTemplate.getTopic());
        this.requestTemplate.init();
        this.responseTemplate.subscribe();
        this.executor.submit(this::mainLoop);
    }

    void mainLoop() {
        while (!this.stopped) {
            TbStopWatch create = TbStopWatch.create();
            try {
                fetchAndProcessResponses();
            } catch (Throwable th) {
                long nanos = TimeUnit.MILLISECONDS.toNanos(this.pollInterval) - create.stopAndGetTotalTimeNanos();
                log.warn("Failed to obtain and process responses from queue. Going to sleep " + nanos + "ns", th);
                sleep(nanos);
            }
        }
    }

    void fetchAndProcessResponses() {
        long mappingCount = this.pendingRequests.mappingCount();
        log.trace("Starting template pool topic {}, for pendingRequests {}", this.responseTemplate.getTopic(), Long.valueOf(mappingCount));
        List<Response> doPoll = doPoll();
        log.trace("Completed template poll topic {}, for pendingRequests [{}], received [{}] responses", new Object[]{this.responseTemplate.getTopic(), Long.valueOf(mappingCount), Integer.valueOf(doPoll.size())});
        doPoll.forEach(this::processResponse);
        this.responseTemplate.commit();
        tryCleanStaleRequests();
    }

    private boolean tryCleanStaleRequests() {
        if (!this.cleanerLock.tryLock()) {
            return false;
        }
        try {
            log.trace("tryCleanStaleRequest...");
            long currentClockNs = getCurrentClockNs();
            if (this.nextCleanupNs < currentClockNs) {
                this.pendingRequests.forEach((uuid, responseMetaData) -> {
                    ResponseMetaData<Response> remove;
                    if (responseMetaData.expTime >= currentClockNs || (remove = this.pendingRequests.remove(uuid)) == null) {
                        return;
                    }
                    setTimeoutException(uuid, remove, currentClockNs);
                });
                setupNextCleanup();
            }
            return true;
        } finally {
            this.cleanerLock.unlock();
        }
    }

    void setupNextCleanup() {
        this.nextCleanupNs = getCurrentClockNs() + this.maxRequestTimeoutNs;
        log.trace("setupNextCleanup {}", Long.valueOf(this.nextCleanupNs));
    }

    List<Response> doPoll() {
        return this.responseTemplate.poll(this.pollInterval);
    }

    void sleep(long j) {
        LockSupport.parkNanos(j);
    }

    void setTimeoutException(UUID uuid, ResponseMetaData<Response> responseMetaData, long j) {
        if (j >= responseMetaData.getSubmitTime() + responseMetaData.getTimeout()) {
            log.debug("Request timeout detected, currentNs [{}], {}, key [{}]", new Object[]{Long.valueOf(j), responseMetaData, uuid});
        } else {
            log.info("Request timeout detected, currentNs [{}], {}, key [{}]", new Object[]{Long.valueOf(j), responseMetaData, uuid});
        }
        ((ResponseMetaData) responseMetaData).future.setException(new TimeoutException());
    }

    void processResponse(Response response) {
        byte[] bArr = response.getHeaders().get("requestId");
        if (bArr == null) {
            log.error("[{}] Missing requestId in header and body", response);
            return;
        }
        UUID bytesToUuid = bytesToUuid(bArr);
        log.trace("[{}] Response received: {}", bytesToUuid, response);
        ResponseMetaData<Response> remove = this.pendingRequests.remove(bytesToUuid);
        if (remove == null) {
            log.debug("[{}] Invalid or stale request, response: {}", bytesToUuid, String.valueOf(response).replace("\n", " "));
        } else {
            ((ResponseMetaData) remove).future.set(response);
        }
    }

    public void stop() {
        this.stopped = true;
        if (this.responseTemplate != null) {
            this.responseTemplate.unsubscribe();
        }
        if (this.requestTemplate != null) {
            this.requestTemplate.stop();
        }
        if (this.internalExecutor) {
            this.executor.shutdownNow();
        }
    }

    public void setMessagesStats(MessagesStats messagesStats) {
        this.messagesStats = messagesStats;
    }

    public ListenableFuture<Response> send(Request request) {
        return send(request, this.maxRequestTimeoutNs);
    }

    public ListenableFuture<Response> send(Request request, long j) {
        if (this.pendingRequests.mappingCount() >= this.maxPendingRequests) {
            log.warn("Pending request map is full [{}]! Consider to increase maxPendingRequests or increase processing performance. Request is {}", Long.valueOf(this.maxPendingRequests), request);
            return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
        }
        UUID randomUUID = UUID.randomUUID();
        request.getHeaders().put("requestId", uuidToBytes(randomUUID));
        request.getHeaders().put("responseTopic", stringToBytes(this.responseTemplate.getTopic()));
        request.getHeaders().put("expireTs", longToBytes(getCurrentTimeMs() + this.maxRequestTimeout));
        long currentClockNs = getCurrentClockNs();
        SettableFuture<Response> create = SettableFuture.create();
        ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(currentClockNs + j, create, currentClockNs, j);
        log.trace("pending {}", responseMetaData);
        if (this.pendingRequests.putIfAbsent(randomUUID, responseMetaData) != null) {
            log.warn("Pending request already exists [{}]!", Long.valueOf(this.maxPendingRequests));
            return Futures.immediateFailedFuture(new RuntimeException("Pending request already exists !" + randomUUID));
        }
        sendToRequestTemplate(request, randomUUID, create, responseMetaData);
        return create;
    }

    long getCurrentClockNs() {
        return System.nanoTime();
    }

    long getCurrentTimeMs() {
        return System.currentTimeMillis();
    }

    void sendToRequestTemplate(final Request request, final UUID uuid, final SettableFuture<Response> settableFuture, ResponseMetaData<Response> responseMetaData) {
        log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", new Object[]{uuid, request.getKey(), Long.valueOf(((ResponseMetaData) responseMetaData).expTime), request});
        if (this.messagesStats != null) {
            this.messagesStats.incrementTotal();
        }
        this.requestTemplate.send(TopicPartitionInfo.builder().topic(this.requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() { // from class: org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate.1
            public void onSuccess(TbQueueMsgMetadata tbQueueMsgMetadata) {
                if (DefaultTbQueueRequestTemplate.this.messagesStats != null) {
                    DefaultTbQueueRequestTemplate.this.messagesStats.incrementSuccessful();
                }
                DefaultTbQueueRequestTemplate.log.trace("[{}] Request sent: {}, request {}", new Object[]{uuid, tbQueueMsgMetadata, request});
            }

            public void onFailure(Throwable th) {
                if (DefaultTbQueueRequestTemplate.this.messagesStats != null) {
                    DefaultTbQueueRequestTemplate.this.messagesStats.incrementFailed();
                }
                DefaultTbQueueRequestTemplate.this.pendingRequests.remove(uuid);
                settableFuture.setException(th);
            }
        });
    }

    public static <Request extends TbQueueMsg, Response extends TbQueueMsg> DefaultTbQueueRequestTemplateBuilder<Request, Response> builder() {
        return new DefaultTbQueueRequestTemplateBuilder<>();
    }
}
