package org.thingsboard.server.queue.common.state;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;

/* loaded from: input_file:org/thingsboard/server/queue/common/state/KafkaQueueStateService.class */
public class KafkaQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> extends QueueStateService<E, S> {
    private static final Logger log = LoggerFactory.getLogger(KafkaQueueStateService.class);
    private final PartitionedQueueConsumerManager<S> stateConsumer;
    private final Supplier<Map<String, Long>> eventsStartOffsetsProvider;
    private final Set<TopicPartitionInfo> partitionsInProgress;

    /* loaded from: input_file:org/thingsboard/server/queue/common/state/KafkaQueueStateService$KafkaQueueStateServiceBuilder.class */
    public static class KafkaQueueStateServiceBuilder<E extends TbQueueMsg, S extends TbQueueMsg> {
        private PartitionedQueueConsumerManager<E> eventConsumer;
        private PartitionedQueueConsumerManager<S> stateConsumer;
        private List<PartitionedQueueConsumerManager<?>> otherConsumers;
        private Supplier<Map<String, Long>> eventsStartOffsetsProvider;

        KafkaQueueStateServiceBuilder() {
        }

        public KafkaQueueStateServiceBuilder<E, S> eventConsumer(PartitionedQueueConsumerManager<E> partitionedQueueConsumerManager) {
            this.eventConsumer = partitionedQueueConsumerManager;
            return this;
        }

        public KafkaQueueStateServiceBuilder<E, S> stateConsumer(PartitionedQueueConsumerManager<S> partitionedQueueConsumerManager) {
            this.stateConsumer = partitionedQueueConsumerManager;
            return this;
        }

        public KafkaQueueStateServiceBuilder<E, S> otherConsumers(List<PartitionedQueueConsumerManager<?>> list) {
            this.otherConsumers = list;
            return this;
        }

        public KafkaQueueStateServiceBuilder<E, S> eventsStartOffsetsProvider(Supplier<Map<String, Long>> supplier) {
            this.eventsStartOffsetsProvider = supplier;
            return this;
        }

        public KafkaQueueStateService<E, S> build() {
            return new KafkaQueueStateService<>(this.eventConsumer, this.stateConsumer, this.otherConsumers, this.eventsStartOffsetsProvider);
        }

        public String toString() {
            return "KafkaQueueStateService.KafkaQueueStateServiceBuilder(eventConsumer=" + String.valueOf(this.eventConsumer) + ", stateConsumer=" + String.valueOf(this.stateConsumer) + ", otherConsumers=" + String.valueOf(this.otherConsumers) + ", eventsStartOffsetsProvider=" + String.valueOf(this.eventsStartOffsetsProvider) + ")";
        }
    }

    public KafkaQueueStateService(PartitionedQueueConsumerManager<E> partitionedQueueConsumerManager, PartitionedQueueConsumerManager<S> partitionedQueueConsumerManager2, List<PartitionedQueueConsumerManager<?>> list, Supplier<Map<String, Long>> supplier) {
        super(partitionedQueueConsumerManager, list != null ? list : Collections.emptyList());
        this.partitionsInProgress = ConcurrentHashMap.newKeySet();
        this.stateConsumer = partitionedQueueConsumerManager2;
        this.eventsStartOffsetsProvider = supplier;
    }

    @Override // org.thingsboard.server.queue.common.state.QueueStateService
    protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> set, Runnable runnable) {
        Map<String, Long> map = this.eventsStartOffsetsProvider != null ? this.eventsStartOffsetsProvider.get() : null;
        Set<TopicPartitionInfo> withTopic = TopicPartitionInfo.withTopic(set, this.stateConsumer.getTopic());
        this.partitionsInProgress.addAll(withTopic);
        this.stateConsumer.addPartitions(withTopic, topicPartitionInfo -> {
            Function<String, Long> function;
            Lock readLock = this.partitionsLock.readLock();
            readLock.lock();
            try {
                this.partitionsInProgress.remove(topicPartitionInfo);
                log.info("Finished partition {} (still in progress: {})", topicPartitionInfo, this.partitionsInProgress);
                if (this.partitionsInProgress.isEmpty()) {
                    log.info("All partitions processed");
                    if (runnable != null) {
                        runnable.run();
                    }
                }
                TopicPartitionInfo withTopic2 = topicPartitionInfo.withTopic(this.eventConsumer.getTopic());
                if (this.partitions.get(queueKey).contains(withTopic2)) {
                    PartitionedQueueConsumerManager<E> partitionedQueueConsumerManager = this.eventConsumer;
                    Set<TopicPartitionInfo> of = Set.of(withTopic2);
                    if (map != null) {
                        Objects.requireNonNull(map);
                        function = (v1) -> {
                            return r3.get(v1);
                        };
                    } else {
                        function = null;
                    }
                    partitionedQueueConsumerManager.addPartitions(of, null, function);
                    for (PartitionedQueueConsumerManager<?> partitionedQueueConsumerManager2 : this.otherConsumers) {
                        partitionedQueueConsumerManager2.addPartitions(Set.of(topicPartitionInfo.withTopic(partitionedQueueConsumerManager2.getTopic())));
                    }
                }
            } finally {
                readLock.unlock();
            }
        }, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.thingsboard.server.queue.common.state.QueueStateService
    public void removePartitions(QueueKey queueKey, Set<TopicPartitionInfo> set) {
        super.removePartitions(queueKey, set);
        this.stateConsumer.removePartitions(TopicPartitionInfo.withTopic(set, this.stateConsumer.getTopic()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.thingsboard.server.queue.common.state.QueueStateService
    public void deletePartitions(Set<TopicPartitionInfo> set) {
        super.deletePartitions(set);
        this.stateConsumer.delete(TopicPartitionInfo.withTopic(set, this.stateConsumer.getTopic()));
    }

    @Override // org.thingsboard.server.queue.common.state.QueueStateService
    public void stop() {
        super.stop();
        this.stateConsumer.stop();
        this.stateConsumer.awaitStop();
    }

    public static <E extends TbQueueMsg, S extends TbQueueMsg> KafkaQueueStateServiceBuilder<E, S> builder() {
        return new KafkaQueueStateServiceBuilder<>();
    }
}
