package org.thingsboard.server.queue.common.consumer;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;

/* loaded from: input_file:org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.class */
public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfig> {
    private static final Logger log = LoggerFactory.getLogger(MainQueueConsumerManager.class);
    protected final Object queueKey;
    protected C config;
    protected final MsgPackProcessor<M, C> msgPackProcessor;
    protected final BiFunction<C, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator;
    protected final ExecutorService consumerExecutor;
    protected final ScheduledExecutorService scheduler;
    protected final ExecutorService taskExecutor;
    protected final Consumer<Throwable> uncaughtErrorHandler;
    private final Queue<TbQueueConsumerManagerTask> tasks = new ConcurrentLinkedQueue();
    private final ReentrantLock lock = new ReentrantLock();
    private volatile Set<TopicPartitionInfo> partitions;
    protected volatile ConsumerWrapper<M> consumerWrapper;
    protected volatile boolean stopped;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager$ConsumerPerPartitionWrapper.class */
    public class ConsumerPerPartitionWrapper implements ConsumerWrapper<M> {
        private final Map<TopicPartitionInfo, TbQueueConsumerTask<M>> consumers = new HashMap();

        ConsumerPerPartitionWrapper() {
        }

        @Override // org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager.ConsumerWrapper
        public void updatePartitions(Set<TopicPartitionInfo> set) {
            HashSet hashSet = new HashSet(set);
            hashSet.removeAll(this.consumers.keySet());
            HashSet hashSet2 = new HashSet(this.consumers.keySet());
            hashSet2.removeAll(set);
            MainQueueConsumerManager.log.info("[{}] Added partitions: {}, removed partitions: {}", new Object[]{MainQueueConsumerManager.this.queueKey, hashSet, hashSet2});
            removePartitions(hashSet2);
            addPartitions(hashSet, null, null);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void removePartitions(Set<TopicPartitionInfo> set) {
            set.forEach(topicPartitionInfo -> {
                Optional.ofNullable(this.consumers.get(topicPartitionInfo)).ifPresent((v0) -> {
                    v0.initiateStop();
                });
            });
            set.forEach(topicPartitionInfo2 -> {
                Optional.ofNullable(this.consumers.remove(topicPartitionInfo2)).ifPresent((v0) -> {
                    v0.awaitCompletion();
                });
            });
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void addPartitions(Set<TopicPartitionInfo> set, Consumer<TopicPartitionInfo> consumer, Function<String, Long> function) {
            set.forEach(topicPartitionInfo -> {
                TbQueueConsumerTask<M> tbQueueConsumerTask = new TbQueueConsumerTask<>(String.valueOf(MainQueueConsumerManager.this.queueKey) + "-" + ((Integer) topicPartitionInfo.getPartition().orElse(-1)), () -> {
                    TbQueueConsumer<M> apply = MainQueueConsumerManager.this.consumerCreator.apply(MainQueueConsumerManager.this.config, topicPartitionInfo);
                    if (function != null && (apply instanceof TbKafkaConsumerTemplate)) {
                        ((TbKafkaConsumerTemplate) apply).setStartOffsetProvider(function);
                    }
                    return apply;
                }, consumer != null ? () -> {
                    consumer.accept(topicPartitionInfo);
                } : null);
                this.consumers.put(topicPartitionInfo, tbQueueConsumerTask);
                tbQueueConsumerTask.subscribe(Set.of(topicPartitionInfo));
                MainQueueConsumerManager.this.launchConsumer(tbQueueConsumerTask);
            });
        }

        @Override // org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager.ConsumerWrapper
        public Collection<TbQueueConsumerTask<M>> getConsumers() {
            return this.consumers.values();
        }
    }

    /* loaded from: input_file:org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager$ConsumerWrapper.class */
    public interface ConsumerWrapper<M extends TbQueueMsg> {
        void updatePartitions(Set<TopicPartitionInfo> set);

        Collection<TbQueueConsumerTask<M>> getConsumers();
    }

    /* loaded from: input_file:org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager$MainQueueConsumerManagerBuilder.class */
    public static class MainQueueConsumerManagerBuilder<M extends TbQueueMsg, C extends QueueConfig> {
        private Object queueKey;
        private C config;
        private MsgPackProcessor<M, C> msgPackProcessor;
        private BiFunction<C, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator;
        private ExecutorService consumerExecutor;
        private ScheduledExecutorService scheduler;
        private ExecutorService taskExecutor;
        private Consumer<Throwable> uncaughtErrorHandler;

        MainQueueConsumerManagerBuilder() {
        }

        public MainQueueConsumerManagerBuilder<M, C> queueKey(Object obj) {
            this.queueKey = obj;
            return this;
        }

        public MainQueueConsumerManagerBuilder<M, C> config(C c) {
            this.config = c;
            return this;
        }

        public MainQueueConsumerManagerBuilder<M, C> msgPackProcessor(MsgPackProcessor<M, C> msgPackProcessor) {
            this.msgPackProcessor = msgPackProcessor;
            return this;
        }

        public MainQueueConsumerManagerBuilder<M, C> consumerCreator(BiFunction<C, TopicPartitionInfo, TbQueueConsumer<M>> biFunction) {
            this.consumerCreator = biFunction;
            return this;
        }

        public MainQueueConsumerManagerBuilder<M, C> consumerExecutor(ExecutorService executorService) {
            this.consumerExecutor = executorService;
            return this;
        }

        public MainQueueConsumerManagerBuilder<M, C> scheduler(ScheduledExecutorService scheduledExecutorService) {
            this.scheduler = scheduledExecutorService;
            return this;
        }

        public MainQueueConsumerManagerBuilder<M, C> taskExecutor(ExecutorService executorService) {
            this.taskExecutor = executorService;
            return this;
        }

        public MainQueueConsumerManagerBuilder<M, C> uncaughtErrorHandler(Consumer<Throwable> consumer) {
            this.uncaughtErrorHandler = consumer;
            return this;
        }

        public MainQueueConsumerManager<M, C> build() {
            return new MainQueueConsumerManager<>(this.queueKey, this.config, this.msgPackProcessor, this.consumerCreator, this.consumerExecutor, this.scheduler, this.taskExecutor, this.uncaughtErrorHandler);
        }

        public String toString() {
            return "MainQueueConsumerManager.MainQueueConsumerManagerBuilder(queueKey=" + String.valueOf(this.queueKey) + ", config=" + String.valueOf(this.config) + ", msgPackProcessor=" + String.valueOf(this.msgPackProcessor) + ", consumerCreator=" + String.valueOf(this.consumerCreator) + ", consumerExecutor=" + String.valueOf(this.consumerExecutor) + ", scheduler=" + String.valueOf(this.scheduler) + ", taskExecutor=" + String.valueOf(this.taskExecutor) + ", uncaughtErrorHandler=" + String.valueOf(this.uncaughtErrorHandler) + ")";
        }
    }

    /* loaded from: input_file:org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager$MsgPackProcessor.class */
    public interface MsgPackProcessor<M extends TbQueueMsg, C extends QueueConfig> {
        void process(List<M> list, TbQueueConsumer<M> tbQueueConsumer, C c) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager$SingleConsumerWrapper.class */
    public class SingleConsumerWrapper implements ConsumerWrapper<M> {
        private TbQueueConsumerTask<M> consumer;

        SingleConsumerWrapper() {
        }

        @Override // org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager.ConsumerWrapper
        public void updatePartitions(Set<TopicPartitionInfo> set) {
            MainQueueConsumerManager.log.info("[{}] New partitions: {}", MainQueueConsumerManager.this.queueKey, set);
            if (set.isEmpty()) {
                if (this.consumer != null && this.consumer.isRunning()) {
                    this.consumer.initiateStop();
                    this.consumer.awaitCompletion();
                }
                this.consumer = null;
                return;
            }
            if (this.consumer == null) {
                this.consumer = new TbQueueConsumerTask<>(MainQueueConsumerManager.this.queueKey, () -> {
                    return MainQueueConsumerManager.this.consumerCreator.apply(MainQueueConsumerManager.this.config, null);
                }, null);
            }
            this.consumer.subscribe(set);
            if (this.consumer.isRunning()) {
                return;
            }
            MainQueueConsumerManager.this.launchConsumer(this.consumer);
        }

        @Override // org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager.ConsumerWrapper
        public Collection<TbQueueConsumerTask<M>> getConsumers() {
            return this.consumer == null ? Collections.emptyList() : List.of(this.consumer);
        }
    }

    public MainQueueConsumerManager(Object obj, C c, MsgPackProcessor<M, C> msgPackProcessor, BiFunction<C, TopicPartitionInfo, TbQueueConsumer<M>> biFunction, ExecutorService executorService, ScheduledExecutorService scheduledExecutorService, ExecutorService executorService2, Consumer<Throwable> consumer) {
        this.queueKey = obj;
        this.config = c;
        this.msgPackProcessor = msgPackProcessor;
        this.consumerCreator = biFunction;
        this.consumerExecutor = executorService;
        this.scheduler = scheduledExecutorService;
        this.taskExecutor = executorService2;
        this.uncaughtErrorHandler = consumer;
        if (c != null) {
            init(c);
        }
    }

    public void init(C c) {
        this.config = c;
        this.consumerWrapper = createConsumerWrapper(c);
        log.debug("[{}] Initialized consumer for queue: {}", this.queueKey, c);
    }

    protected ConsumerWrapper<M> createConsumerWrapper(C c) {
        return c.isConsumerPerPartition() ? new ConsumerPerPartitionWrapper() : new SingleConsumerWrapper();
    }

    public void update(C c) {
        addTask(new TbQueueConsumerManagerTask.UpdateConfigTask(c));
    }

    public void update(Set<TopicPartitionInfo> set) {
        addTask(new TbQueueConsumerManagerTask.UpdatePartitionsTask(set));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addTask(TbQueueConsumerManagerTask tbQueueConsumerManagerTask) {
        if (this.stopped) {
            return;
        }
        this.tasks.add(tbQueueConsumerManagerTask);
        log.trace("[{}] Added task: {}", this.queueKey, tbQueueConsumerManagerTask);
        tryProcessTasks();
    }

    private void tryProcessTasks() {
        this.taskExecutor.submit(() -> {
            TbQueueConsumerManagerTask poll;
            try {
                if (!this.lock.tryLock()) {
                    log.trace("[{}] Failed to acquire lock", this.queueKey);
                    this.scheduler.schedule(this::tryProcessTasks, 1L, TimeUnit.SECONDS);
                    return;
                }
                C c = null;
                Set<TopicPartitionInfo> set = null;
                while (!this.stopped && (poll = this.tasks.poll()) != null) {
                    try {
                        log.trace("[{}] Processing task: {}", this.queueKey, poll);
                        if (poll instanceof TbQueueConsumerManagerTask.UpdatePartitionsTask) {
                            set = ((TbQueueConsumerManagerTask.UpdatePartitionsTask) poll).partitions();
                        } else if (poll instanceof TbQueueConsumerManagerTask.UpdateConfigTask) {
                            c = ((TbQueueConsumerManagerTask.UpdateConfigTask) poll).config();
                        } else {
                            processTask(poll);
                        }
                    } catch (Exception e) {
                        log.error("[{}] Failed to process tasks", this.queueKey, e);
                        this.lock.unlock();
                        return;
                    }
                }
                if (this.stopped) {
                    this.lock.unlock();
                    return;
                }
                if (c != null) {
                    doUpdate((MainQueueConsumerManager<M, C>) c);
                }
                if (set != null) {
                    doUpdate(set);
                }
                this.lock.unlock();
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        });
    }

    protected void processTask(TbQueueConsumerManagerTask tbQueueConsumerManagerTask) {
    }

    private void doUpdate(C c) {
        log.info("[{}] Processing queue update: {}", this.queueKey, c);
        C c2 = this.config;
        this.config = c;
        if (log.isTraceEnabled()) {
            log.trace("[{}] Old queue configuration: {}", this.queueKey, c2);
            log.trace("[{}] New queue configuration: {}", this.queueKey, c);
        }
        if (c2 == null) {
            init(this.config);
            return;
        }
        if (c.isConsumerPerPartition() == c2.isConsumerPerPartition()) {
            log.trace("[{}] Silently applied new config, because consumer-per-partition not changed", this.queueKey);
            return;
        }
        this.consumerWrapper.getConsumers().forEach((v0) -> {
            v0.initiateStop();
        });
        this.consumerWrapper.getConsumers().forEach((v0) -> {
            v0.awaitCompletion();
        });
        init(this.config);
        if (this.partitions != null) {
            doUpdate(this.partitions);
        }
    }

    private void doUpdate(Set<TopicPartitionInfo> set) {
        this.partitions = set;
        this.consumerWrapper.updatePartitions(set);
    }

    private void launchConsumer(TbQueueConsumerTask<M> tbQueueConsumerTask) {
        log.info("[{}] Launching consumer", tbQueueConsumerTask.getKey());
        tbQueueConsumerTask.setTask(this.consumerExecutor.submit(() -> {
            ThingsBoardThreadFactory.updateCurrentThreadName(tbQueueConsumerTask.getKey().toString());
            consumerLoop(tbQueueConsumerTask.getConsumer());
            log.info("[{}] Consumer stopped", tbQueueConsumerTask.getKey());
            try {
                Runnable callback = tbQueueConsumerTask.getCallback();
                if (callback != null) {
                    callback.run();
                }
            } catch (Throwable th) {
                log.error("Failed to execute finish callback", th);
            }
        }));
    }

    private void consumerLoop(TbQueueConsumer<M> tbQueueConsumer) {
        while (!this.stopped && !tbQueueConsumer.isStopped()) {
            try {
                try {
                    List<M> poll = tbQueueConsumer.poll(this.config.getPollInterval());
                    if (!poll.isEmpty()) {
                        processMsgs(poll, tbQueueConsumer, this.config);
                    }
                } catch (Exception e) {
                    if (!tbQueueConsumer.isStopped()) {
                        log.warn("Failed to process messages from queue", e);
                        try {
                            Thread.sleep(this.config.getPollInterval());
                        } catch (InterruptedException e2) {
                            log.trace("Failed to wait until the server has capacity to handle new requests", e2);
                        }
                    }
                }
            } catch (Throwable th) {
                log.error("Failure in consumer loop", th);
                if (this.uncaughtErrorHandler != null) {
                    this.uncaughtErrorHandler.accept(th);
                }
                tbQueueConsumer.unsubscribe();
                return;
            }
        }
        if (tbQueueConsumer.isStopped()) {
            tbQueueConsumer.unsubscribe();
        }
    }

    protected void processMsgs(List<M> list, TbQueueConsumer<M> tbQueueConsumer, C c) throws Exception {
        log.trace("Processing {} messages", Integer.valueOf(list.size()));
        this.msgPackProcessor.process(list, tbQueueConsumer, c);
        log.trace("Processed {} messages", Integer.valueOf(list.size()));
    }

    public void stop() {
        log.debug("[{}] Stopping consumers", this.queueKey);
        this.consumerWrapper.getConsumers().forEach((v0) -> {
            v0.initiateStop();
        });
        this.stopped = true;
    }

    public void awaitStop() {
        awaitStop(30);
    }

    private void awaitStop(int i) {
        log.debug("[{}] Waiting for consumers to stop", this.queueKey);
        this.consumerWrapper.getConsumers().forEach(tbQueueConsumerTask -> {
            tbQueueConsumerTask.awaitCompletion(i);
        });
        log.debug("[{}] Unsubscribed and stopped consumers", this.queueKey);
    }

    public static <M extends TbQueueMsg, C extends QueueConfig> MainQueueConsumerManagerBuilder<M, C> builder() {
        return new MainQueueConsumerManagerBuilder<>();
    }

    public Object getQueueKey() {
        return this.queueKey;
    }

    public C getConfig() {
        return this.config;
    }

    public ExecutorService getConsumerExecutor() {
        return this.consumerExecutor;
    }

    public ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }

    public ExecutorService getTaskExecutor() {
        return this.taskExecutor;
    }

    public Set<TopicPartitionInfo> getPartitions() {
        return this.partitions;
    }
}
