/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.common;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;

public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg>
implements TbQueueConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractTbQueueConsumerTemplate.class);
    public static final long ONE_MILLISECOND_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(1L);
    private volatile boolean subscribed;
    protected volatile boolean stopped = false;
    protected volatile Set<TopicPartitionInfo> partitions;
    protected final ReentrantLock consumerLock = new ReentrantLock();
    final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<Set<TopicPartitionInfo>>();
    private final String topic;

    public AbstractTbQueueConsumerTemplate(String topic) {
        this.topic = topic;
    }

    public void subscribe() {
        log.debug("enqueue topic subscribe {} ", (Object)this.topic);
        if (this.stopped) {
            log.error("trying subscribe, but consumer stopped for topic {}", (Object)this.topic);
            return;
        }
        this.subscribeQueue.add(Collections.singleton(new TopicPartitionInfo(this.topic, null, null, true)));
    }

    public void subscribe(Set<TopicPartitionInfo> partitions) {
        log.debug("enqueue topics subscribe {} ", partitions);
        if (this.stopped) {
            log.error("trying subscribe, but consumer stopped for topic {}", (Object)this.topic);
            return;
        }
        this.subscribeQueue.add(partitions);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<T> poll(long durationInMillis) {
        List records;
        long startNanos = System.nanoTime();
        if (this.stopped) {
            log.error("poll invoked but consumer stopped for topic " + this.topic, (Throwable)new RuntimeException("stacktrace"));
            return Collections.emptyList();
        }
        if (!this.subscribed && this.partitions == null && this.subscribeQueue.isEmpty()) {
            return this.sleepAndReturnEmpty(startNanos, durationInMillis);
        }
        if (this.consumerLock.isLocked()) {
            log.error("poll. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock topic " + this.topic, (Throwable)new RuntimeException("stacktrace"));
        }
        this.consumerLock.lock();
        try {
            while (!this.subscribeQueue.isEmpty()) {
                this.subscribed = false;
                this.partitions = this.subscribeQueue.poll();
            }
            if (!this.subscribed) {
                List<String> topicNames = this.getFullTopicNames();
                log.info("Subscribing to topics {}", topicNames);
                this.doSubscribe(topicNames);
                this.subscribed = true;
            }
            records = this.partitions.isEmpty() ? Collections.emptyList() : this.doPoll(durationInMillis);
        }
        finally {
            this.consumerLock.unlock();
        }
        if (records.isEmpty() && !this.isLongPollingSupported()) {
            return this.sleepAndReturnEmpty(startNanos, durationInMillis);
        }
        return this.decodeRecords(records);
    }

    @Nonnull
    List<T> decodeRecords(@Nonnull List<R> records) {
        ArrayList result = new ArrayList(records.size());
        records.forEach(record -> {
            try {
                if (record != null) {
                    result.add(this.decode(record));
                }
            }
            catch (IOException e) {
                log.error("Failed decode record: [{}]", record);
                throw new RuntimeException("Failed to decode record: ", e);
            }
        });
        return result;
    }

    List<T> sleepAndReturnEmpty(long startNanos, long durationInMillis) {
        block3: {
            long spentNanos;
            long durationNanos = TimeUnit.MILLISECONDS.toNanos(durationInMillis);
            long nanosLeft = durationNanos - (spentNanos = System.nanoTime() - startNanos);
            if (nanosLeft >= ONE_MILLISECOND_IN_NANOS) {
                try {
                    long sleepMs = TimeUnit.NANOSECONDS.toMillis(nanosLeft);
                    log.trace("Going to sleep after poll: topic {} for {}ms", (Object)this.topic, (Object)sleepMs);
                    Thread.sleep(sleepMs);
                }
                catch (InterruptedException e) {
                    if (this.stopped) break block3;
                    log.error("Failed to wait", (Throwable)e);
                }
            }
        }
        return Collections.emptyList();
    }

    public void commit() {
        if (this.consumerLock.isLocked()) {
            log.error("commit. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock topic " + this.topic, (Throwable)new RuntimeException("stacktrace"));
        }
        this.consumerLock.lock();
        try {
            this.doCommit();
        }
        finally {
            this.consumerLock.unlock();
        }
    }

    public void stop() {
        this.stopped = true;
    }

    public void unsubscribe() {
        log.info("Unsubscribing and stopping consumer for topics {}", this.getFullTopicNames());
        this.stopped = true;
        this.consumerLock.lock();
        try {
            if (this.subscribed) {
                this.doUnsubscribe();
            }
        }
        finally {
            this.consumerLock.unlock();
        }
    }

    public boolean isStopped() {
        return this.stopped;
    }

    protected abstract List<R> doPoll(long var1);

    protected abstract T decode(R var1) throws IOException;

    protected abstract void doSubscribe(List<String> var1);

    protected abstract void doCommit();

    protected abstract void doUnsubscribe();

    public List<String> getFullTopicNames() {
        if (this.partitions == null) {
            return Collections.emptyList();
        }
        return this.partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
    }

    protected boolean isLongPollingSupported() {
        return false;
    }

    public String getTopic() {
        return this.topic;
    }
}

