package org.thingsboard.server.queue.common;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.MessagesStats;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;

/* loaded from: input_file:org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.class */
public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> extends AbstractTbQueueTemplate implements TbQueueRequestTemplate<Request, Response> {
    private static final Logger log = LoggerFactory.getLogger(DefaultTbQueueRequestTemplate.class);
    private final TbQueueAdmin queueAdmin;
    private final TbQueueProducer<Request> requestTemplate;
    private final TbQueueConsumer<Response> responseTemplate;
    private final boolean internalExecutor;
    private final ExecutorService executor;
    private final long maxRequestTimeout;
    private final long maxPendingRequests;
    private final long pollInterval;
    private MessagesStats messagesStats;
    private volatile long tickTs = 0;
    private volatile long tickSize = 0;
    private volatile boolean stopped = false;
    private final ConcurrentMap<UUID, ResponseMetaData<Response>> pendingRequests = new ConcurrentHashMap();

    /* loaded from: input_file:org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate$DefaultTbQueueRequestTemplateBuilder.class */
    public static class DefaultTbQueueRequestTemplateBuilder<Request extends TbQueueMsg, Response extends TbQueueMsg> {
        private TbQueueAdmin queueAdmin;
        private TbQueueProducer<Request> requestTemplate;
        private TbQueueConsumer<Response> responseTemplate;
        private long maxRequestTimeout;
        private long maxPendingRequests;
        private long pollInterval;
        private ExecutorService executor;

        DefaultTbQueueRequestTemplateBuilder() {
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> queueAdmin(TbQueueAdmin tbQueueAdmin) {
            this.queueAdmin = tbQueueAdmin;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> requestTemplate(TbQueueProducer<Request> tbQueueProducer) {
            this.requestTemplate = tbQueueProducer;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> responseTemplate(TbQueueConsumer<Response> tbQueueConsumer) {
            this.responseTemplate = tbQueueConsumer;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> maxRequestTimeout(long j) {
            this.maxRequestTimeout = j;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> maxPendingRequests(long j) {
            this.maxPendingRequests = j;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> pollInterval(long j) {
            this.pollInterval = j;
            return this;
        }

        public DefaultTbQueueRequestTemplateBuilder<Request, Response> executor(ExecutorService executorService) {
            this.executor = executorService;
            return this;
        }

        public DefaultTbQueueRequestTemplate<Request, Response> build() {
            return new DefaultTbQueueRequestTemplate<>(this.queueAdmin, this.requestTemplate, this.responseTemplate, this.maxRequestTimeout, this.maxPendingRequests, this.pollInterval, this.executor);
        }

        public String toString() {
            return "DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder(queueAdmin=" + this.queueAdmin + ", requestTemplate=" + this.requestTemplate + ", responseTemplate=" + this.responseTemplate + ", maxRequestTimeout=" + this.maxRequestTimeout + ", maxPendingRequests=" + this.maxPendingRequests + ", pollInterval=" + this.pollInterval + ", executor=" + this.executor + ")";
        }
    }

    /* loaded from: input_file:org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate$ResponseMetaData.class */
    private static class ResponseMetaData<T> {
        private final long expTime;
        private final SettableFuture<T> future;

        ResponseMetaData(long j, SettableFuture<T> settableFuture) {
            this.expTime = j;
            this.future = settableFuture;
        }
    }

    public DefaultTbQueueRequestTemplate(TbQueueAdmin tbQueueAdmin, TbQueueProducer<Request> tbQueueProducer, TbQueueConsumer<Response> tbQueueConsumer, long j, long j2, long j3, ExecutorService executorService) {
        this.queueAdmin = tbQueueAdmin;
        this.requestTemplate = tbQueueProducer;
        this.responseTemplate = tbQueueConsumer;
        this.maxRequestTimeout = j;
        this.maxPendingRequests = j2;
        this.pollInterval = j3;
        if (executorService != null) {
            this.internalExecutor = false;
            this.executor = executorService;
        } else {
            this.internalExecutor = true;
            this.executor = Executors.newSingleThreadExecutor();
        }
    }

    @Override // org.thingsboard.server.queue.TbQueueRequestTemplate
    public void init() {
        this.queueAdmin.createTopicIfNotExists(this.responseTemplate.getTopic());
        this.requestTemplate.init();
        this.tickTs = System.currentTimeMillis();
        this.responseTemplate.subscribe();
        this.executor.submit(() -> {
            long j = 0;
            while (!this.stopped) {
                try {
                    List<Response> poll = this.responseTemplate.poll(this.pollInterval);
                    if (poll.size() > 0) {
                        log.trace("Polling responses completed, consumer records count [{}]", Integer.valueOf(poll.size()));
                    }
                    poll.forEach(tbQueueMsg -> {
                        byte[] bArr = tbQueueMsg.getHeaders().get("requestId");
                        if (bArr == null) {
                            log.error("[{}] Missing requestId in header and body", tbQueueMsg);
                            return;
                        }
                        UUID bytesToUuid = bytesToUuid(bArr);
                        log.trace("[{}] Response received: {}", bytesToUuid, tbQueueMsg);
                        ResponseMetaData<Response> remove = this.pendingRequests.remove(bytesToUuid);
                        if (remove == null) {
                            log.trace("[{}] Invalid or stale request", bytesToUuid);
                        } else {
                            ((ResponseMetaData) remove).future.set(tbQueueMsg);
                        }
                    });
                    this.responseTemplate.commit();
                    this.tickTs = System.currentTimeMillis();
                    this.tickSize = this.pendingRequests.size();
                    if (j < this.tickTs) {
                        this.pendingRequests.forEach((uuid, responseMetaData) -> {
                            ResponseMetaData<Response> remove;
                            if (responseMetaData.expTime >= this.tickTs || (remove = this.pendingRequests.remove(uuid)) == null) {
                                return;
                            }
                            log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", new Object[]{uuid, Long.valueOf(((ResponseMetaData) remove).expTime), Long.valueOf(this.tickTs)});
                            ((ResponseMetaData) remove).future.setException(new TimeoutException());
                        });
                        j = this.tickTs + this.maxRequestTimeout;
                    }
                } catch (Throwable th) {
                    log.warn("Failed to obtain responses from queue.", th);
                    try {
                        Thread.sleep(this.pollInterval);
                    } catch (InterruptedException e) {
                        log.trace("Failed to wait until the server has capacity to handle new responses", e);
                    }
                }
            }
        });
    }

    @Override // org.thingsboard.server.queue.TbQueueRequestTemplate
    public void stop() {
        this.stopped = true;
        if (this.responseTemplate != null) {
            this.responseTemplate.unsubscribe();
        }
        if (this.requestTemplate != null) {
            this.requestTemplate.stop();
        }
        if (this.internalExecutor) {
            this.executor.shutdownNow();
        }
    }

    @Override // org.thingsboard.server.queue.TbQueueRequestTemplate
    public void setMessagesStats(MessagesStats messagesStats) {
        this.messagesStats = messagesStats;
    }

    @Override // org.thingsboard.server.queue.TbQueueRequestTemplate
    public ListenableFuture<Response> send(Request request) {
        if (this.tickSize > this.maxPendingRequests) {
            return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
        }
        final UUID randomUUID = UUID.randomUUID();
        request.getHeaders().put("requestId", uuidToBytes(randomUUID));
        request.getHeaders().put("responseTopic", stringToBytes(this.responseTemplate.getTopic()));
        request.getHeaders().put("requestTime", longToBytes(System.currentTimeMillis()));
        final SettableFuture create = SettableFuture.create();
        ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(this.tickTs + this.maxRequestTimeout, create);
        this.pendingRequests.putIfAbsent(randomUUID, responseMetaData);
        log.trace("[{}] Sending request, key [{}], expTime [{}]", new Object[]{randomUUID, request.getKey(), Long.valueOf(((ResponseMetaData) responseMetaData).expTime)});
        if (this.messagesStats != null) {
            this.messagesStats.incrementTotal();
        }
        this.requestTemplate.send(TopicPartitionInfo.builder().topic(this.requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() { // from class: org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate.1
            @Override // org.thingsboard.server.queue.TbQueueCallback
            public void onSuccess(TbQueueMsgMetadata tbQueueMsgMetadata) {
                if (DefaultTbQueueRequestTemplate.this.messagesStats != null) {
                    DefaultTbQueueRequestTemplate.this.messagesStats.incrementSuccessful();
                }
                DefaultTbQueueRequestTemplate.log.trace("[{}] Request sent: {}", randomUUID, tbQueueMsgMetadata);
            }

            @Override // org.thingsboard.server.queue.TbQueueCallback
            public void onFailure(Throwable th) {
                if (DefaultTbQueueRequestTemplate.this.messagesStats != null) {
                    DefaultTbQueueRequestTemplate.this.messagesStats.incrementFailed();
                }
                DefaultTbQueueRequestTemplate.this.pendingRequests.remove(randomUUID);
                create.setException(th);
            }
        });
        return create;
    }

    public static <Request extends TbQueueMsg, Response extends TbQueueMsg> DefaultTbQueueRequestTemplateBuilder<Request, Response> builder() {
        return new DefaultTbQueueRequestTemplateBuilder<>();
    }
}
