/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.common.state;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.state.QueueStateService;
import org.thingsboard.server.queue.discovery.QueueKey;

public class KafkaQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg>
extends QueueStateService<E, S> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaQueueStateService.class);
    private final PartitionedQueueConsumerManager<S> stateConsumer;
    private final Supplier<Map<String, Long>> eventsStartOffsetsProvider;
    private final Set<TopicPartitionInfo> partitionsInProgress = ConcurrentHashMap.newKeySet();

    public KafkaQueueStateService(PartitionedQueueConsumerManager<E> eventConsumer, PartitionedQueueConsumerManager<S> stateConsumer, List<PartitionedQueueConsumerManager<?>> otherConsumers, Supplier<Map<String, Long>> eventsStartOffsetsProvider) {
        super(eventConsumer, otherConsumers != null ? otherConsumers : Collections.emptyList());
        this.stateConsumer = stateConsumer;
        this.eventsStartOffsetsProvider = eventsStartOffsetsProvider;
    }

    @Override
    protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions, Runnable whenAllProcessed) {
        Map<String, Long> eventsStartOffsets = this.eventsStartOffsetsProvider != null ? this.eventsStartOffsetsProvider.get() : null;
        Set statePartitions = TopicPartitionInfo.withTopic(partitions, (String)this.stateConsumer.getTopic());
        this.partitionsInProgress.addAll(statePartitions);
        this.stateConsumer.addPartitions(statePartitions, (TopicPartitionInfo statePartition) -> {
            Lock readLock = this.partitionsLock.readLock();
            readLock.lock();
            try {
                this.partitionsInProgress.remove(statePartition);
                log.info("Finished partition {} (still in progress: {})", statePartition, this.partitionsInProgress);
                if (this.partitionsInProgress.isEmpty()) {
                    log.info("All partitions processed");
                    if (whenAllProcessed != null) {
                        whenAllProcessed.run();
                    }
                }
                TopicPartitionInfo eventPartition = statePartition.withTopic(this.eventConsumer.getTopic());
                if (((Set)this.partitions.get(queueKey)).contains(eventPartition)) {
                    this.eventConsumer.addPartitions(Set.of(eventPartition), null, eventsStartOffsets != null ? eventsStartOffsets::get : null);
                    for (PartitionedQueueConsumerManager consumer : this.otherConsumers) {
                        consumer.addPartitions(Set.of(statePartition.withTopic(consumer.getTopic())));
                    }
                }
            }
            finally {
                readLock.unlock();
            }
        }, null);
    }

    @Override
    protected void removePartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
        super.removePartitions(queueKey, partitions);
        this.stateConsumer.removePartitions(TopicPartitionInfo.withTopic(partitions, (String)this.stateConsumer.getTopic()));
    }

    @Override
    protected void deletePartitions(Set<TopicPartitionInfo> partitions) {
        super.deletePartitions(partitions);
        this.stateConsumer.delete(TopicPartitionInfo.withTopic(partitions, (String)this.stateConsumer.getTopic()));
    }

    @Override
    public void stop() {
        super.stop();
        this.stateConsumer.stop();
        this.stateConsumer.awaitStop();
    }

    @Generated
    public static <E extends TbQueueMsg, S extends TbQueueMsg> KafkaQueueStateServiceBuilder<E, S> builder() {
        return new KafkaQueueStateServiceBuilder();
    }

    @Generated
    public static class KafkaQueueStateServiceBuilder<E extends TbQueueMsg, S extends TbQueueMsg> {
        @Generated
        private PartitionedQueueConsumerManager<E> eventConsumer;
        @Generated
        private PartitionedQueueConsumerManager<S> stateConsumer;
        @Generated
        private List<PartitionedQueueConsumerManager<?>> otherConsumers;
        @Generated
        private Supplier<Map<String, Long>> eventsStartOffsetsProvider;

        @Generated
        KafkaQueueStateServiceBuilder() {
        }

        @Generated
        public KafkaQueueStateServiceBuilder<E, S> eventConsumer(PartitionedQueueConsumerManager<E> eventConsumer) {
            this.eventConsumer = eventConsumer;
            return this;
        }

        @Generated
        public KafkaQueueStateServiceBuilder<E, S> stateConsumer(PartitionedQueueConsumerManager<S> stateConsumer) {
            this.stateConsumer = stateConsumer;
            return this;
        }

        @Generated
        public KafkaQueueStateServiceBuilder<E, S> otherConsumers(List<PartitionedQueueConsumerManager<?>> otherConsumers) {
            this.otherConsumers = otherConsumers;
            return this;
        }

        @Generated
        public KafkaQueueStateServiceBuilder<E, S> eventsStartOffsetsProvider(Supplier<Map<String, Long>> eventsStartOffsetsProvider) {
            this.eventsStartOffsetsProvider = eventsStartOffsetsProvider;
            return this;
        }

        @Generated
        public KafkaQueueStateService<E, S> build() {
            return new KafkaQueueStateService<E, S>(this.eventConsumer, this.stateConsumer, this.otherConsumers, this.eventsStartOffsetsProvider);
        }

        @Generated
        public String toString() {
            return "KafkaQueueStateService.KafkaQueueStateServiceBuilder(eventConsumer=" + String.valueOf(this.eventConsumer) + ", stateConsumer=" + String.valueOf(this.stateConsumer) + ", otherConsumers=" + String.valueOf(this.otherConsumers) + ", eventsStartOffsetsProvider=" + String.valueOf(this.eventsStartOffsetsProvider) + ")";
        }
    }
}

