package org.thingsboard.server.queue.common;

import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;

/* loaded from: input_file:org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.class */
public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> implements TbQueueConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractTbQueueConsumerTemplate.class);
    public static final long ONE_MILLISECOND_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
    private volatile boolean subscribed;
    protected volatile Set<TopicPartitionInfo> partitions;
    private final String topic;
    protected volatile boolean stopped = false;
    protected final ReentrantLock consumerLock = new ReentrantLock();
    final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue();

    public AbstractTbQueueConsumerTemplate(String str) {
        this.topic = str;
    }

    public void subscribe() {
        log.debug("enqueue topic subscribe {} ", this.topic);
        if (this.stopped) {
            log.error("trying subscribe, but consumer stopped for topic {}", this.topic);
        } else {
            this.subscribeQueue.add(Collections.singleton(new TopicPartitionInfo(this.topic, (TenantId) null, (Integer) null, true)));
        }
    }

    public void subscribe(Set<TopicPartitionInfo> set) {
        log.debug("enqueue topics subscribe {} ", set);
        if (this.stopped) {
            log.error("trying subscribe, but consumer stopped for topic {}", this.topic);
        } else {
            this.subscribeQueue.add(set);
        }
    }

    public List<T> poll(long j) {
        long nanoTime = System.nanoTime();
        if (this.stopped) {
            log.error("poll invoked but consumer stopped for topic " + this.topic, new RuntimeException("stacktrace"));
            return Collections.emptyList();
        }
        if (!this.subscribed && this.partitions == null && this.subscribeQueue.isEmpty()) {
            return sleepAndReturnEmpty(nanoTime, j);
        }
        if (this.consumerLock.isLocked()) {
            log.error("poll. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock topic " + this.topic, new RuntimeException("stacktrace"));
        }
        this.consumerLock.lock();
        while (!this.subscribeQueue.isEmpty()) {
            try {
                this.subscribed = false;
                this.partitions = this.subscribeQueue.poll();
            } finally {
                this.consumerLock.unlock();
            }
        }
        if (!this.subscribed) {
            log.info("Subscribing to {}", this.partitions);
            doSubscribe(this.partitions);
            this.subscribed = true;
        }
        List<R> emptyList = this.partitions.isEmpty() ? Collections.emptyList() : doPoll(j);
        return (!emptyList.isEmpty() || isLongPollingSupported()) ? decodeRecords(emptyList) : sleepAndReturnEmpty(nanoTime, j);
    }

    @Nonnull
    List<T> decodeRecords(@Nonnull List<R> list) {
        ArrayList arrayList = new ArrayList(list.size());
        list.forEach(obj -> {
            if (obj != 0) {
                try {
                    arrayList.add(decode(obj));
                } catch (Exception e) {
                    log.error("Failed to decode record {}", obj, e);
                    throw new RuntimeException("Failed to decode record " + String.valueOf(obj), e);
                }
            }
        });
        return arrayList;
    }

    List<T> sleepAndReturnEmpty(long j, long j2) {
        long nanos = TimeUnit.MILLISECONDS.toNanos(j2) - (System.nanoTime() - j);
        if (nanos >= ONE_MILLISECOND_IN_NANOS) {
            try {
                long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
                log.trace("Going to sleep after poll: topic {} for {}ms", this.topic, Long.valueOf(millis));
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                if (!this.stopped) {
                    log.error("Failed to wait", e);
                }
            }
        }
        return Collections.emptyList();
    }

    public void commit() {
        if (this.consumerLock.isLocked()) {
            if (this.stopped) {
                return;
            } else {
                log.error("commit. consumerLock is locked. will wait with no timeout. it looks like a race conditions or deadlock topic " + this.topic, new RuntimeException("stacktrace"));
            }
        }
        this.consumerLock.lock();
        try {
            doCommit();
        } finally {
            this.consumerLock.unlock();
        }
    }

    public void stop() {
        this.stopped = true;
    }

    public void unsubscribe() {
        log.info("Unsubscribing and stopping consumer for {}", this.partitions);
        this.stopped = true;
        this.consumerLock.lock();
        try {
            if (this.subscribed) {
                doUnsubscribe();
            }
        } finally {
            this.consumerLock.unlock();
        }
    }

    public boolean isStopped() {
        return this.stopped;
    }

    protected abstract List<R> doPoll(long j);

    protected abstract T decode(R r) throws IOException;

    protected abstract void doSubscribe(Set<TopicPartitionInfo> set);

    protected abstract void doCommit();

    protected abstract void doUnsubscribe();

    public List<String> getFullTopicNames() {
        return this.partitions == null ? Collections.emptyList() : this.partitions.stream().map((v0) -> {
            return v0.getFullTopicName();
        }).toList();
    }

    protected boolean isLongPollingSupported() {
        return false;
    }

    public String getTopic() {
        return this.topic;
    }
}
