/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.common.consumer;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask;

public class PartitionedQueueConsumerManager<M extends TbQueueMsg>
extends MainQueueConsumerManager<M, QueueConfig> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PartitionedQueueConsumerManager.class);
    private final MainQueueConsumerManager.ConsumerPerPartitionWrapper consumerWrapper;
    private final TbQueueAdmin queueAdmin;
    private final String topic;

    public PartitionedQueueConsumerManager(Object queueKey, String topic, long pollInterval, MainQueueConsumerManager.MsgPackProcessor<M, QueueConfig> msgPackProcessor, BiFunction<QueueConfig, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator, TbQueueAdmin queueAdmin, ExecutorService consumerExecutor, ScheduledExecutorService scheduler, ExecutorService taskExecutor, Consumer<Throwable> uncaughtErrorHandler) {
        super(queueKey, QueueConfig.of((boolean)true, (long)pollInterval), msgPackProcessor, consumerCreator, consumerExecutor, scheduler, taskExecutor, uncaughtErrorHandler);
        this.topic = topic;
        this.consumerWrapper = (MainQueueConsumerManager.ConsumerPerPartitionWrapper)((MainQueueConsumerManager)this).consumerWrapper;
        this.queueAdmin = queueAdmin;
    }

    @Override
    protected void processTask(TbQueueConsumerManagerTask task) {
        if (task instanceof TbQueueConsumerManagerTask.AddPartitionsTask) {
            TbQueueConsumerManagerTask.AddPartitionsTask addPartitionsTask = (TbQueueConsumerManagerTask.AddPartitionsTask)task;
            log.info("[{}] Added partitions: {}", this.queueKey, addPartitionsTask.partitions());
            this.consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop(), addPartitionsTask.startOffsetProvider());
        } else if (task instanceof TbQueueConsumerManagerTask.RemovePartitionsTask) {
            TbQueueConsumerManagerTask.RemovePartitionsTask removePartitionsTask = (TbQueueConsumerManagerTask.RemovePartitionsTask)task;
            log.info("[{}] Removed partitions: {}", this.queueKey, removePartitionsTask.partitions());
            this.consumerWrapper.removePartitions(removePartitionsTask.partitions());
        } else if (task instanceof TbQueueConsumerManagerTask.DeletePartitionsTask) {
            TbQueueConsumerManagerTask.DeletePartitionsTask deletePartitionsTask = (TbQueueConsumerManagerTask.DeletePartitionsTask)task;
            log.info("[{}] Removing partitions and deleting topics: {}", this.queueKey, deletePartitionsTask.partitions());
            this.consumerWrapper.removePartitions(deletePartitionsTask.partitions());
            deletePartitionsTask.partitions().forEach(tpi -> {
                String topic = tpi.getFullTopicName();
                try {
                    this.queueAdmin.deleteTopic(topic);
                }
                catch (Throwable t) {
                    log.error("Failed to delete topic {}", (Object)topic, (Object)t);
                }
            });
        }
    }

    public void addPartitions(Set<TopicPartitionInfo> partitions) {
        this.addPartitions(partitions, null, null);
    }

    public void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop, Function<String, Long> startOffsetProvider) {
        this.addTask(new TbQueueConsumerManagerTask.AddPartitionsTask(partitions, onStop, startOffsetProvider));
    }

    public void removePartitions(Set<TopicPartitionInfo> partitions) {
        this.addTask(new TbQueueConsumerManagerTask.RemovePartitionsTask(partitions));
    }

    public void delete(Set<TopicPartitionInfo> partitions) {
        this.addTask(new TbQueueConsumerManagerTask.DeletePartitionsTask(partitions));
    }

    @Generated
    public static <M extends TbQueueMsg> PartitionedQueueConsumerManagerBuilder<M> create() {
        return new PartitionedQueueConsumerManagerBuilder();
    }

    @Generated
    public String getTopic() {
        return this.topic;
    }

    @Generated
    public static class PartitionedQueueConsumerManagerBuilder<M extends TbQueueMsg> {
        @Generated
        private Object queueKey;
        @Generated
        private String topic;
        @Generated
        private long pollInterval;
        @Generated
        private MainQueueConsumerManager.MsgPackProcessor<M, QueueConfig> msgPackProcessor;
        @Generated
        private BiFunction<QueueConfig, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator;
        @Generated
        private TbQueueAdmin queueAdmin;
        @Generated
        private ExecutorService consumerExecutor;
        @Generated
        private ScheduledExecutorService scheduler;
        @Generated
        private ExecutorService taskExecutor;
        @Generated
        private Consumer<Throwable> uncaughtErrorHandler;

        @Generated
        PartitionedQueueConsumerManagerBuilder() {
        }

        @Generated
        public PartitionedQueueConsumerManagerBuilder<M> queueKey(Object queueKey) {
            this.queueKey = queueKey;
            return this;
        }

        @Generated
        public PartitionedQueueConsumerManagerBuilder<M> topic(String topic) {
            this.topic = topic;
            return this;
        }

        @Generated
        public PartitionedQueueConsumerManagerBuilder<M> pollInterval(long pollInterval) {
            this.pollInterval = pollInterval;
            return this;
        }

        @Generated
        public PartitionedQueueConsumerManagerBuilder<M> msgPackProcessor(MainQueueConsumerManager.MsgPackProcessor<M, QueueConfig> msgPackProcessor) {
            this.msgPackProcessor = msgPackProcessor;
            return this;
        }

        @Generated
        public PartitionedQueueConsumerManagerBuilder<M> consumerCreator(BiFunction<QueueConfig, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator) {
            this.consumerCreator = consumerCreator;
            return this;
        }

        @Generated
        public PartitionedQueueConsumerManagerBuilder<M> queueAdmin(TbQueueAdmin queueAdmin) {
            this.queueAdmin = queueAdmin;
            return this;
        }

        @Generated
        public PartitionedQueueConsumerManagerBuilder<M> consumerExecutor(ExecutorService consumerExecutor) {
            this.consumerExecutor = consumerExecutor;
            return this;
        }

        @Generated
        public PartitionedQueueConsumerManagerBuilder<M> scheduler(ScheduledExecutorService scheduler) {
            this.scheduler = scheduler;
            return this;
        }

        @Generated
        public PartitionedQueueConsumerManagerBuilder<M> taskExecutor(ExecutorService taskExecutor) {
            this.taskExecutor = taskExecutor;
            return this;
        }

        @Generated
        public PartitionedQueueConsumerManagerBuilder<M> uncaughtErrorHandler(Consumer<Throwable> uncaughtErrorHandler) {
            this.uncaughtErrorHandler = uncaughtErrorHandler;
            return this;
        }

        @Generated
        public PartitionedQueueConsumerManager<M> build() {
            return new PartitionedQueueConsumerManager<M>(this.queueKey, this.topic, this.pollInterval, this.msgPackProcessor, this.consumerCreator, this.queueAdmin, this.consumerExecutor, this.scheduler, this.taskExecutor, this.uncaughtErrorHandler);
        }

        @Generated
        public String toString() {
            return "PartitionedQueueConsumerManager.PartitionedQueueConsumerManagerBuilder(queueKey=" + String.valueOf(this.queueKey) + ", topic=" + this.topic + ", pollInterval=" + this.pollInterval + ", msgPackProcessor=" + String.valueOf(this.msgPackProcessor) + ", consumerCreator=" + String.valueOf(this.consumerCreator) + ", queueAdmin=" + String.valueOf(this.queueAdmin) + ", consumerExecutor=" + String.valueOf(this.consumerExecutor) + ", scheduler=" + String.valueOf(this.scheduler) + ", taskExecutor=" + String.valueOf(this.taskExecutor) + ", uncaughtErrorHandler=" + String.valueOf(this.uncaughtErrorHandler) + ")";
        }
    }
}

