/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.common.consumer;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerTask;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;

public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfig> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MainQueueConsumerManager.class);
    protected final Object queueKey;
    protected C config;
    protected final MsgPackProcessor<M, C> msgPackProcessor;
    protected final BiFunction<C, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator;
    protected final ExecutorService consumerExecutor;
    protected final ScheduledExecutorService scheduler;
    protected final ExecutorService taskExecutor;
    protected final Consumer<Throwable> uncaughtErrorHandler;
    private final Queue<TbQueueConsumerManagerTask> tasks = new ConcurrentLinkedQueue<TbQueueConsumerManagerTask>();
    private final ReentrantLock lock = new ReentrantLock();
    private volatile Set<TopicPartitionInfo> partitions;
    protected volatile ConsumerWrapper<M> consumerWrapper;
    protected volatile boolean stopped;

    public MainQueueConsumerManager(Object queueKey, C config, MsgPackProcessor<M, C> msgPackProcessor, BiFunction<C, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator, ExecutorService consumerExecutor, ScheduledExecutorService scheduler, ExecutorService taskExecutor, Consumer<Throwable> uncaughtErrorHandler) {
        this.queueKey = queueKey;
        this.config = config;
        this.msgPackProcessor = msgPackProcessor;
        this.consumerCreator = consumerCreator;
        this.consumerExecutor = consumerExecutor;
        this.scheduler = scheduler;
        this.taskExecutor = taskExecutor;
        this.uncaughtErrorHandler = uncaughtErrorHandler;
        if (config != null) {
            this.init(config);
        }
    }

    public void init(C config) {
        this.config = config;
        this.consumerWrapper = this.createConsumerWrapper(config);
        log.debug("[{}] Initialized consumer for queue: {}", this.queueKey, config);
    }

    protected ConsumerWrapper<M> createConsumerWrapper(C config) {
        if (config.isConsumerPerPartition()) {
            return new ConsumerPerPartitionWrapper();
        }
        return new SingleConsumerWrapper();
    }

    public void update(C config) {
        this.addTask(new TbQueueConsumerManagerTask.UpdateConfigTask((QueueConfig)config));
    }

    public void update(Set<TopicPartitionInfo> partitions) {
        this.addTask(new TbQueueConsumerManagerTask.UpdatePartitionsTask(partitions));
    }

    protected void addTask(TbQueueConsumerManagerTask todo) {
        if (this.stopped) {
            return;
        }
        this.tasks.add(todo);
        log.trace("[{}] Added task: {}", this.queueKey, (Object)todo);
        this.tryProcessTasks();
    }

    private void tryProcessTasks() {
        this.taskExecutor.submit(() -> {
            if (this.lock.tryLock()) {
                try {
                    TbQueueConsumerManagerTask task;
                    QueueConfig newConfig = null;
                    Set<TopicPartitionInfo> newPartitions = null;
                    while (!this.stopped && (task = this.tasks.poll()) != null) {
                        log.trace("[{}] Processing task: {}", this.queueKey, (Object)task);
                        if (task instanceof TbQueueConsumerManagerTask.UpdatePartitionsTask) {
                            TbQueueConsumerManagerTask.UpdatePartitionsTask updatePartitionsTask = (TbQueueConsumerManagerTask.UpdatePartitionsTask)task;
                            newPartitions = updatePartitionsTask.partitions();
                            continue;
                        }
                        if (task instanceof TbQueueConsumerManagerTask.UpdateConfigTask) {
                            TbQueueConsumerManagerTask.UpdateConfigTask updateConfigTask = (TbQueueConsumerManagerTask.UpdateConfigTask)task;
                            newConfig = updateConfigTask.config();
                            continue;
                        }
                        this.processTask(task);
                    }
                    if (this.stopped) {
                        return;
                    }
                    if (newConfig != null) {
                        this.doUpdate(newConfig);
                    }
                    if (newPartitions == null) return;
                    this.doUpdate(newPartitions);
                    return;
                }
                catch (Exception e) {
                    log.error("[{}] Failed to process tasks", this.queueKey, (Object)e);
                    return;
                }
                finally {
                    this.lock.unlock();
                }
            } else {
                log.trace("[{}] Failed to acquire lock", this.queueKey);
                this.scheduler.schedule(this::tryProcessTasks, 1L, TimeUnit.SECONDS);
            }
        });
    }

    protected void processTask(TbQueueConsumerManagerTask task) {
    }

    private void doUpdate(C newConfig) {
        log.info("[{}] Processing queue update: {}", this.queueKey, newConfig);
        C oldConfig = this.config;
        this.config = newConfig;
        if (log.isTraceEnabled()) {
            log.trace("[{}] Old queue configuration: {}", this.queueKey, oldConfig);
            log.trace("[{}] New queue configuration: {}", this.queueKey, newConfig);
        }
        if (oldConfig == null) {
            this.init(this.config);
        } else if (newConfig.isConsumerPerPartition() != oldConfig.isConsumerPerPartition()) {
            this.consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::initiateStop);
            this.consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion);
            this.init(this.config);
            if (this.partitions != null) {
                this.doUpdate(this.partitions);
            }
        } else {
            log.trace("[{}] Silently applied new config, because consumer-per-partition not changed", this.queueKey);
        }
    }

    private void doUpdate(Set<TopicPartitionInfo> partitions) {
        this.partitions = partitions;
        this.consumerWrapper.updatePartitions(partitions);
    }

    private void launchConsumer(TbQueueConsumerTask<M> consumerTask) {
        log.info("[{}] Launching consumer", consumerTask.getKey());
        Future<?> consumerLoop = this.consumerExecutor.submit(() -> {
            ThingsBoardThreadFactory.updateCurrentThreadName((String)consumerTask.getKey().toString());
            this.consumerLoop(consumerTask.getKey(), consumerTask.getConsumer());
            log.info("[{}] Consumer stopped", consumerTask.getKey());
            try {
                Runnable callback = consumerTask.getCallback();
                if (callback != null) {
                    callback.run();
                }
            }
            catch (Throwable t) {
                log.error("Failed to execute finish callback", t);
            }
        });
        consumerTask.setTask(consumerLoop);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void consumerLoop(Object consumerKey, TbQueueConsumer<M> consumer) {
        try {
            while (!this.stopped && !consumer.isStopped()) {
                try {
                    List msgs = consumer.poll((long)this.config.getPollInterval());
                    if (msgs.isEmpty()) continue;
                    this.processMsgs(msgs, consumer, consumerKey, this.config);
                }
                catch (Exception e) {
                    if (consumer.isStopped()) continue;
                    log.warn("Failed to process messages from queue", (Throwable)e);
                    try {
                        Thread.sleep(this.config.getPollInterval());
                    }
                    catch (InterruptedException e2) {
                        log.trace("Failed to wait until the server has capacity to handle new requests", (Throwable)e2);
                    }
                }
            }
            if (!consumer.isStopped()) return;
            consumer.unsubscribe();
            return;
        }
        catch (Throwable t) {
            log.error("Failure in consumer loop", t);
            if (this.uncaughtErrorHandler != null) {
                this.uncaughtErrorHandler.accept(t);
            }
            consumer.unsubscribe();
        }
    }

    protected void processMsgs(List<M> msgs, TbQueueConsumer<M> consumer, Object consumerKey, C config) throws Exception {
        log.trace("Processing {} messages", (Object)msgs.size());
        this.msgPackProcessor.process(msgs, consumer, consumerKey, config);
        log.trace("Processed {} messages", (Object)msgs.size());
    }

    public void stop() {
        log.debug("[{}] Stopping consumers", this.queueKey);
        this.consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::initiateStop);
        this.stopped = true;
    }

    public void awaitStop() {
        this.awaitStop(30);
    }

    private void awaitStop(int timeoutSec) {
        log.debug("[{}] Waiting for consumers to stop", this.queueKey);
        this.consumerWrapper.getConsumers().forEach(consumerTask -> consumerTask.awaitCompletion(timeoutSec));
        log.debug("[{}] Unsubscribed and stopped consumers", this.queueKey);
    }

    @Generated
    public static <M extends TbQueueMsg, C extends QueueConfig> MainQueueConsumerManagerBuilder<M, C> builder() {
        return new MainQueueConsumerManagerBuilder();
    }

    @Generated
    public Object getQueueKey() {
        return this.queueKey;
    }

    @Generated
    public C getConfig() {
        return this.config;
    }

    @Generated
    public ExecutorService getConsumerExecutor() {
        return this.consumerExecutor;
    }

    @Generated
    public ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }

    @Generated
    public ExecutorService getTaskExecutor() {
        return this.taskExecutor;
    }

    @Generated
    public Set<TopicPartitionInfo> getPartitions() {
        return this.partitions;
    }

    public static interface MsgPackProcessor<M extends TbQueueMsg, C extends QueueConfig> {
        public void process(List<M> var1, TbQueueConsumer<M> var2, Object var3, C var4) throws Exception;
    }

    public static interface ConsumerWrapper<M extends TbQueueMsg> {
        public void updatePartitions(Set<TopicPartitionInfo> var1);

        public Collection<TbQueueConsumerTask<M>> getConsumers();
    }

    class ConsumerPerPartitionWrapper
    implements ConsumerWrapper<M> {
        private final Map<TopicPartitionInfo, TbQueueConsumerTask<M>> consumers = new HashMap();

        ConsumerPerPartitionWrapper() {
        }

        @Override
        public void updatePartitions(Set<TopicPartitionInfo> partitions) {
            HashSet<TopicPartitionInfo> addedPartitions = new HashSet<TopicPartitionInfo>(partitions);
            addedPartitions.removeAll(this.consumers.keySet());
            HashSet<TopicPartitionInfo> removedPartitions = new HashSet<TopicPartitionInfo>(this.consumers.keySet());
            removedPartitions.removeAll(partitions);
            log.info("[{}] Added partitions: {}, removed partitions: {}", new Object[]{MainQueueConsumerManager.this.queueKey, addedPartitions, removedPartitions});
            this.removePartitions(removedPartitions);
            this.addPartitions(addedPartitions, null, null);
        }

        protected void removePartitions(Set<TopicPartitionInfo> removedPartitions) {
            removedPartitions.forEach(tpi -> Optional.ofNullable(this.consumers.get(tpi)).ifPresent(TbQueueConsumerTask::initiateStop));
            removedPartitions.forEach(tpi -> Optional.ofNullable(this.consumers.remove(tpi)).ifPresent(TbQueueConsumerTask::awaitCompletion));
        }

        protected void addPartitions(Set<TopicPartitionInfo> partitions, Consumer<TopicPartitionInfo> onStop, Function<String, Long> startOffsetProvider) {
            partitions.forEach(tpi -> {
                Integer partitionId = tpi.getPartition().orElse(-1);
                String key = String.valueOf(MainQueueConsumerManager.this.queueKey) + "-" + partitionId;
                Runnable callback = onStop != null ? () -> onStop.accept((TopicPartitionInfo)tpi) : null;
                TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, () -> {
                    TbQueueConsumer queueConsumer = MainQueueConsumerManager.this.consumerCreator.apply(MainQueueConsumerManager.this.config, (TopicPartitionInfo)tpi);
                    if (startOffsetProvider != null && queueConsumer instanceof TbKafkaConsumerTemplate) {
                        TbKafkaConsumerTemplate kafkaConsumer = (TbKafkaConsumerTemplate)queueConsumer;
                        kafkaConsumer.setStartOffsetProvider(startOffsetProvider);
                    }
                    return queueConsumer;
                }, callback);
                this.consumers.put((TopicPartitionInfo)tpi, consumer);
                consumer.subscribe(Set.of(tpi));
                MainQueueConsumerManager.this.launchConsumer(consumer);
            });
        }

        @Override
        public Collection<TbQueueConsumerTask<M>> getConsumers() {
            return this.consumers.values();
        }
    }

    class SingleConsumerWrapper
    implements ConsumerWrapper<M> {
        private TbQueueConsumerTask<M> consumer;

        SingleConsumerWrapper() {
        }

        @Override
        public void updatePartitions(Set<TopicPartitionInfo> partitions) {
            log.info("[{}] New partitions: {}", MainQueueConsumerManager.this.queueKey, partitions);
            if (partitions.isEmpty()) {
                if (this.consumer != null && this.consumer.isRunning()) {
                    this.consumer.initiateStop();
                    this.consumer.awaitCompletion();
                }
                this.consumer = null;
                return;
            }
            if (this.consumer == null) {
                this.consumer = new TbQueueConsumerTask(MainQueueConsumerManager.this.queueKey, () -> MainQueueConsumerManager.this.consumerCreator.apply(MainQueueConsumerManager.this.config, null), null);
            }
            this.consumer.subscribe(partitions);
            if (!this.consumer.isRunning()) {
                MainQueueConsumerManager.this.launchConsumer(this.consumer);
            }
        }

        @Override
        public Collection<TbQueueConsumerTask<M>> getConsumers() {
            if (this.consumer == null) {
                return Collections.emptyList();
            }
            return List.of(this.consumer);
        }
    }

    @Generated
    public static class MainQueueConsumerManagerBuilder<M extends TbQueueMsg, C extends QueueConfig> {
        @Generated
        private Object queueKey;
        @Generated
        private C config;
        @Generated
        private MsgPackProcessor<M, C> msgPackProcessor;
        @Generated
        private BiFunction<C, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator;
        @Generated
        private ExecutorService consumerExecutor;
        @Generated
        private ScheduledExecutorService scheduler;
        @Generated
        private ExecutorService taskExecutor;
        @Generated
        private Consumer<Throwable> uncaughtErrorHandler;

        @Generated
        MainQueueConsumerManagerBuilder() {
        }

        @Generated
        public MainQueueConsumerManagerBuilder<M, C> queueKey(Object queueKey) {
            this.queueKey = queueKey;
            return this;
        }

        @Generated
        public MainQueueConsumerManagerBuilder<M, C> config(C config) {
            this.config = config;
            return this;
        }

        @Generated
        public MainQueueConsumerManagerBuilder<M, C> msgPackProcessor(MsgPackProcessor<M, C> msgPackProcessor) {
            this.msgPackProcessor = msgPackProcessor;
            return this;
        }

        @Generated
        public MainQueueConsumerManagerBuilder<M, C> consumerCreator(BiFunction<C, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator) {
            this.consumerCreator = consumerCreator;
            return this;
        }

        @Generated
        public MainQueueConsumerManagerBuilder<M, C> consumerExecutor(ExecutorService consumerExecutor) {
            this.consumerExecutor = consumerExecutor;
            return this;
        }

        @Generated
        public MainQueueConsumerManagerBuilder<M, C> scheduler(ScheduledExecutorService scheduler) {
            this.scheduler = scheduler;
            return this;
        }

        @Generated
        public MainQueueConsumerManagerBuilder<M, C> taskExecutor(ExecutorService taskExecutor) {
            this.taskExecutor = taskExecutor;
            return this;
        }

        @Generated
        public MainQueueConsumerManagerBuilder<M, C> uncaughtErrorHandler(Consumer<Throwable> uncaughtErrorHandler) {
            this.uncaughtErrorHandler = uncaughtErrorHandler;
            return this;
        }

        @Generated
        public MainQueueConsumerManager<M, C> build() {
            return new MainQueueConsumerManager<M, C>(this.queueKey, this.config, this.msgPackProcessor, this.consumerCreator, this.consumerExecutor, this.scheduler, this.taskExecutor, this.uncaughtErrorHandler);
        }

        @Generated
        public String toString() {
            return "MainQueueConsumerManager.MainQueueConsumerManagerBuilder(queueKey=" + String.valueOf(this.queueKey) + ", config=" + String.valueOf(this.config) + ", msgPackProcessor=" + String.valueOf(this.msgPackProcessor) + ", consumerCreator=" + String.valueOf(this.consumerCreator) + ", consumerExecutor=" + String.valueOf(this.consumerExecutor) + ", scheduler=" + String.valueOf(this.scheduler) + ", taskExecutor=" + String.valueOf(this.taskExecutor) + ", uncaughtErrorHandler=" + String.valueOf(this.uncaughtErrorHandler) + ")";
        }
    }
}

