/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.common.consumer;

import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;

public class QueueConsumerManager<M extends TbQueueMsg> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(QueueConsumerManager.class);
    private final String name;
    private final MsgPackProcessor<M> msgPackProcessor;
    private final long pollInterval;
    private final ExecutorService consumerExecutor;
    private final String threadPrefix;
    private final TbQueueConsumer<M> consumer;
    private Future<?> consumerTask;
    private volatile boolean stopped;

    public QueueConsumerManager(String name, MsgPackProcessor<M> msgPackProcessor, long pollInterval, Supplier<TbQueueConsumer<M>> consumerCreator, ExecutorService consumerExecutor, String threadPrefix) {
        this.name = name;
        this.pollInterval = pollInterval;
        this.msgPackProcessor = msgPackProcessor;
        this.consumerExecutor = consumerExecutor;
        this.threadPrefix = threadPrefix;
        this.consumer = consumerCreator.get();
    }

    public void subscribe() {
        this.consumer.subscribe();
    }

    public void subscribe(Set<TopicPartitionInfo> partitions) {
        this.consumer.subscribe(partitions);
    }

    public void launch() {
        log.info("[{}] Launching consumer", (Object)this.name);
        this.consumerTask = this.consumerExecutor.submit(() -> {
            if (this.threadPrefix != null) {
                ThingsBoardThreadFactory.addThreadNamePrefix((String)this.threadPrefix);
            }
            try {
                this.consumerLoop(this.consumer);
            }
            catch (Throwable e) {
                log.error("Failure in consumer loop", e);
            }
            log.info("[{}] Consumer stopped", (Object)this.name);
        });
    }

    private void consumerLoop(TbQueueConsumer<M> consumer) {
        while (!this.stopped && !consumer.isStopped()) {
            try {
                List msgs = consumer.poll(this.pollInterval);
                if (msgs.isEmpty()) continue;
                this.msgPackProcessor.process(msgs, consumer);
            }
            catch (Exception e) {
                if (consumer.isStopped()) continue;
                log.warn("Failed to process messages from queue", (Throwable)e);
                try {
                    Thread.sleep(this.pollInterval);
                }
                catch (InterruptedException interruptedException) {
                    log.trace("Failed to wait until the server has capacity to handle new requests", (Throwable)interruptedException);
                }
            }
        }
    }

    public void stop() {
        log.debug("[{}] Stopping consumer", (Object)this.name);
        this.stopped = true;
        this.consumer.unsubscribe();
        try {
            if (this.consumerTask != null) {
                this.consumerTask.get(10L, TimeUnit.SECONDS);
            }
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("[{}] Failed to await consumer loop stop", (Object)this.name, (Object)e);
        }
    }

    @Generated
    public static <M extends TbQueueMsg> QueueConsumerManagerBuilder<M> builder() {
        return new QueueConsumerManagerBuilder();
    }

    @Generated
    public TbQueueConsumer<M> getConsumer() {
        return this.consumer;
    }

    public static interface MsgPackProcessor<M extends TbQueueMsg> {
        public void process(List<M> var1, TbQueueConsumer<M> var2) throws Exception;
    }

    @Generated
    public static class QueueConsumerManagerBuilder<M extends TbQueueMsg> {
        @Generated
        private String name;
        @Generated
        private MsgPackProcessor<M> msgPackProcessor;
        @Generated
        private long pollInterval;
        @Generated
        private Supplier<TbQueueConsumer<M>> consumerCreator;
        @Generated
        private ExecutorService consumerExecutor;
        @Generated
        private String threadPrefix;

        @Generated
        QueueConsumerManagerBuilder() {
        }

        @Generated
        public QueueConsumerManagerBuilder<M> name(String name) {
            this.name = name;
            return this;
        }

        @Generated
        public QueueConsumerManagerBuilder<M> msgPackProcessor(MsgPackProcessor<M> msgPackProcessor) {
            this.msgPackProcessor = msgPackProcessor;
            return this;
        }

        @Generated
        public QueueConsumerManagerBuilder<M> pollInterval(long pollInterval) {
            this.pollInterval = pollInterval;
            return this;
        }

        @Generated
        public QueueConsumerManagerBuilder<M> consumerCreator(Supplier<TbQueueConsumer<M>> consumerCreator) {
            this.consumerCreator = consumerCreator;
            return this;
        }

        @Generated
        public QueueConsumerManagerBuilder<M> consumerExecutor(ExecutorService consumerExecutor) {
            this.consumerExecutor = consumerExecutor;
            return this;
        }

        @Generated
        public QueueConsumerManagerBuilder<M> threadPrefix(String threadPrefix) {
            this.threadPrefix = threadPrefix;
            return this;
        }

        @Generated
        public QueueConsumerManager<M> build() {
            return new QueueConsumerManager<M>(this.name, this.msgPackProcessor, this.pollInterval, this.consumerCreator, this.consumerExecutor, this.threadPrefix);
        }

        @Generated
        public String toString() {
            return "QueueConsumerManager.QueueConsumerManagerBuilder(name=" + this.name + ", msgPackProcessor=" + String.valueOf(this.msgPackProcessor) + ", pollInterval=" + this.pollInterval + ", consumerCreator=" + String.valueOf(this.consumerCreator) + ", consumerExecutor=" + String.valueOf(this.consumerExecutor) + ", threadPrefix=" + this.threadPrefix + ")";
        }
    }
}

