package org.thingsboard.server.queue.common.consumer;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask;
import org.thingsboard.server.queue.discovery.QueueKey;

/* loaded from: input_file:org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.class */
public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQueueConsumerManager<M, QueueConfig> {
    private static final Logger log = LoggerFactory.getLogger(PartitionedQueueConsumerManager.class);
    private final MainQueueConsumerManager<M, QueueConfig>.ConsumerPerPartitionWrapper consumerWrapper;
    private final TbQueueAdmin queueAdmin;
    private final String topic;

    /* loaded from: input_file:org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager$PartitionedQueueConsumerManagerBuilder.class */
    public static class PartitionedQueueConsumerManagerBuilder<M extends TbQueueMsg> {
        private QueueKey queueKey;
        private String topic;
        private long pollInterval;
        private MainQueueConsumerManager.MsgPackProcessor<M, QueueConfig> msgPackProcessor;
        private BiFunction<QueueConfig, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator;
        private TbQueueAdmin queueAdmin;
        private ExecutorService consumerExecutor;
        private ScheduledExecutorService scheduler;
        private ExecutorService taskExecutor;
        private Consumer<Throwable> uncaughtErrorHandler;

        PartitionedQueueConsumerManagerBuilder() {
        }

        public PartitionedQueueConsumerManagerBuilder<M> queueKey(QueueKey queueKey) {
            this.queueKey = queueKey;
            return this;
        }

        public PartitionedQueueConsumerManagerBuilder<M> topic(String str) {
            this.topic = str;
            return this;
        }

        public PartitionedQueueConsumerManagerBuilder<M> pollInterval(long j) {
            this.pollInterval = j;
            return this;
        }

        public PartitionedQueueConsumerManagerBuilder<M> msgPackProcessor(MainQueueConsumerManager.MsgPackProcessor<M, QueueConfig> msgPackProcessor) {
            this.msgPackProcessor = msgPackProcessor;
            return this;
        }

        public PartitionedQueueConsumerManagerBuilder<M> consumerCreator(BiFunction<QueueConfig, TopicPartitionInfo, TbQueueConsumer<M>> biFunction) {
            this.consumerCreator = biFunction;
            return this;
        }

        public PartitionedQueueConsumerManagerBuilder<M> queueAdmin(TbQueueAdmin tbQueueAdmin) {
            this.queueAdmin = tbQueueAdmin;
            return this;
        }

        public PartitionedQueueConsumerManagerBuilder<M> consumerExecutor(ExecutorService executorService) {
            this.consumerExecutor = executorService;
            return this;
        }

        public PartitionedQueueConsumerManagerBuilder<M> scheduler(ScheduledExecutorService scheduledExecutorService) {
            this.scheduler = scheduledExecutorService;
            return this;
        }

        public PartitionedQueueConsumerManagerBuilder<M> taskExecutor(ExecutorService executorService) {
            this.taskExecutor = executorService;
            return this;
        }

        public PartitionedQueueConsumerManagerBuilder<M> uncaughtErrorHandler(Consumer<Throwable> consumer) {
            this.uncaughtErrorHandler = consumer;
            return this;
        }

        public PartitionedQueueConsumerManager<M> build() {
            return new PartitionedQueueConsumerManager<>(this.queueKey, this.topic, this.pollInterval, this.msgPackProcessor, this.consumerCreator, this.queueAdmin, this.consumerExecutor, this.scheduler, this.taskExecutor, this.uncaughtErrorHandler);
        }

        public String toString() {
            String valueOf = String.valueOf(this.queueKey);
            String str = this.topic;
            long j = this.pollInterval;
            String valueOf2 = String.valueOf(this.msgPackProcessor);
            String valueOf3 = String.valueOf(this.consumerCreator);
            String valueOf4 = String.valueOf(this.queueAdmin);
            String valueOf5 = String.valueOf(this.consumerExecutor);
            String valueOf6 = String.valueOf(this.scheduler);
            String valueOf7 = String.valueOf(this.taskExecutor);
            String.valueOf(this.uncaughtErrorHandler);
            return "PartitionedQueueConsumerManager.PartitionedQueueConsumerManagerBuilder(queueKey=" + valueOf + ", topic=" + str + ", pollInterval=" + j + ", msgPackProcessor=" + valueOf + ", consumerCreator=" + valueOf2 + ", queueAdmin=" + valueOf3 + ", consumerExecutor=" + valueOf4 + ", scheduler=" + valueOf5 + ", taskExecutor=" + valueOf6 + ", uncaughtErrorHandler=" + valueOf7 + ")";
        }
    }

    public PartitionedQueueConsumerManager(QueueKey queueKey, String str, long j, MainQueueConsumerManager.MsgPackProcessor<M, QueueConfig> msgPackProcessor, BiFunction<QueueConfig, TopicPartitionInfo, TbQueueConsumer<M>> biFunction, TbQueueAdmin tbQueueAdmin, ExecutorService executorService, ScheduledExecutorService scheduledExecutorService, ExecutorService executorService2, Consumer<Throwable> consumer) {
        super(queueKey, QueueConfig.of(true, j), msgPackProcessor, biFunction, executorService, scheduledExecutorService, executorService2, consumer);
        this.topic = str;
        this.consumerWrapper = (MainQueueConsumerManager.ConsumerPerPartitionWrapper) super.consumerWrapper;
        this.queueAdmin = tbQueueAdmin;
    }

    @Override // org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager
    protected void processTask(TbQueueConsumerManagerTask tbQueueConsumerManagerTask) {
        if (tbQueueConsumerManagerTask instanceof TbQueueConsumerManagerTask.AddPartitionsTask) {
            TbQueueConsumerManagerTask.AddPartitionsTask addPartitionsTask = (TbQueueConsumerManagerTask.AddPartitionsTask) tbQueueConsumerManagerTask;
            log.info("[{}] Added partitions: {}", this.queueKey, addPartitionsTask.partitions());
            this.consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop(), addPartitionsTask.startOffsetProvider());
        } else if (tbQueueConsumerManagerTask instanceof TbQueueConsumerManagerTask.RemovePartitionsTask) {
            TbQueueConsumerManagerTask.RemovePartitionsTask removePartitionsTask = (TbQueueConsumerManagerTask.RemovePartitionsTask) tbQueueConsumerManagerTask;
            log.info("[{}] Removed partitions: {}", this.queueKey, removePartitionsTask.partitions());
            this.consumerWrapper.removePartitions(removePartitionsTask.partitions());
        } else if (tbQueueConsumerManagerTask instanceof TbQueueConsumerManagerTask.DeletePartitionsTask) {
            TbQueueConsumerManagerTask.DeletePartitionsTask deletePartitionsTask = (TbQueueConsumerManagerTask.DeletePartitionsTask) tbQueueConsumerManagerTask;
            log.info("[{}] Removing partitions and deleting topics: {}", this.queueKey, deletePartitionsTask.partitions());
            this.consumerWrapper.removePartitions(deletePartitionsTask.partitions());
            deletePartitionsTask.partitions().forEach(topicPartitionInfo -> {
                String fullTopicName = topicPartitionInfo.getFullTopicName();
                try {
                    this.queueAdmin.deleteTopic(fullTopicName);
                } catch (Throwable th) {
                    log.error("Failed to delete topic {}", fullTopicName, th);
                }
            });
        }
    }

    public void addPartitions(Set<TopicPartitionInfo> set) {
        addPartitions(set, null, null);
    }

    public void addPartitions(Set<TopicPartitionInfo> set, Consumer<TopicPartitionInfo> consumer, Function<String, Long> function) {
        addTask(new TbQueueConsumerManagerTask.AddPartitionsTask(set, consumer, function));
    }

    public void removePartitions(Set<TopicPartitionInfo> set) {
        addTask(new TbQueueConsumerManagerTask.RemovePartitionsTask(set));
    }

    public void delete(Set<TopicPartitionInfo> set) {
        addTask(new TbQueueConsumerManagerTask.DeletePartitionsTask(set));
    }

    public static <M extends TbQueueMsg> PartitionedQueueConsumerManagerBuilder<M> create() {
        return new PartitionedQueueConsumerManagerBuilder<>();
    }

    public String getTopic() {
        return this.topic;
    }
}
