/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StopWatch;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
import org.thingsboard.server.queue.kafka.KafkaTbQueueMsg;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaDecoder;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;

public class TbKafkaConsumerTemplate<T extends TbQueueMsg>
extends AbstractTbQueueConsumerTemplate<ConsumerRecord<String, byte[]>, T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TbKafkaConsumerTemplate.class);
    private final TbKafkaAdmin admin;
    private final KafkaConsumer<String, byte[]> consumer;
    private final TbKafkaDecoder<T> decoder;
    private final TbKafkaConsumerStatsService statsService;
    private final String groupId;
    private Function<String, Long> startOffsetProvider;
    private final boolean readFromBeginning;
    private final boolean stopWhenRead;
    private int readCount;
    private Map<Integer, Long> endOffsets;

    private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder, String clientId, String groupId, String topic, TbQueueAdmin admin, TbKafkaConsumerStatsService statsService, boolean readFromBeginning, boolean stopWhenRead) {
        super(topic);
        Properties props = settings.toConsumerProps(topic);
        props.put("client.id", clientId);
        if (groupId != null) {
            props.put("group.id", groupId);
        }
        this.statsService = statsService;
        this.groupId = groupId;
        if (statsService != null) {
            statsService.registerClientGroup(groupId);
        }
        this.admin = (TbKafkaAdmin)admin;
        this.consumer = new KafkaConsumer(props);
        this.decoder = decoder;
        this.readFromBeginning = readFromBeginning;
        this.stopWhenRead = stopWhenRead;
    }

    @Override
    protected void doSubscribe(Set<TopicPartitionInfo> partitions) {
        Map<String, List> topics;
        if (partitions == null) {
            topics = Collections.emptyMap();
        } else {
            topics = new HashMap();
            partitions.forEach(tpi -> {
                if (tpi.isUseInternalPartition()) {
                    topics.computeIfAbsent(tpi.getFullTopicName(), t -> new ArrayList()).add((Integer)tpi.getPartition().get());
                } else {
                    topics.put(tpi.getFullTopicName(), null);
                }
            });
        }
        if (!topics.isEmpty()) {
            topics.keySet().forEach(arg_0 -> ((TbKafkaAdmin)this.admin).createTopicIfNotExists(arg_0));
            ArrayList toSubscribe = new ArrayList();
            topics.forEach((topic, kafkaPartitions) -> {
                if (kafkaPartitions == null) {
                    if (this.groupId != null) {
                        toSubscribe.add(topic);
                        return;
                    }
                    kafkaPartitions = IntStream.range(0, this.admin.getNumPartitions()).boxed().toList();
                }
                List<TopicPartition> topicPartitions = kafkaPartitions.stream().map(partition -> new TopicPartition(topic, partition.intValue())).toList();
                this.consumer.assign(topicPartitions);
                this.onPartitionsAssigned(topicPartitions);
            });
            if (!toSubscribe.isEmpty()) {
                if (this.readFromBeginning || this.stopWhenRead) {
                    this.consumer.subscribe(toSubscribe, new ConsumerRebalanceListener(){

                        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                        }

                        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                            log.debug("Handling onPartitionsAssigned {}", partitions);
                            TbKafkaConsumerTemplate.this.onPartitionsAssigned(partitions);
                        }
                    });
                } else {
                    this.consumer.subscribe(toSubscribe);
                }
            }
        } else {
            log.info("unsubscribe due to empty topic list");
            this.consumer.unsubscribe();
        }
    }

    @Override
    protected List<ConsumerRecord<String, byte[]>> doPoll(long durationInMillis) {
        List<ConsumerRecord<String, byte[]>> recordList;
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        log.trace("poll topic {} maxDuration {}", (Object)this.getTopic(), (Object)durationInMillis);
        ConsumerRecords records = this.consumer.poll(Duration.ofMillis(durationInMillis));
        stopWatch.stop();
        log.trace("poll topic {} took {}ms", (Object)this.getTopic(), (Object)stopWatch.getTotalTimeMillis());
        if (records.isEmpty()) {
            recordList = Collections.emptyList();
        } else {
            recordList = new ArrayList(256);
            records.forEach(record -> {
                recordList.add((ConsumerRecord<String, byte[]>)record);
                if (this.stopWhenRead && this.endOffsets != null) {
                    ++this.readCount;
                    int partition = record.partition();
                    Long endOffset = this.endOffsets.get(partition);
                    if (endOffset == null) {
                        log.debug("End offset not found for {} [{}]", (Object)record.topic(), (Object)partition);
                        return;
                    }
                    log.trace("[{}-{}] Got record offset {}, expected end offset: {}", new Object[]{record.topic(), partition, record.offset(), endOffset - 1L});
                    if (record.offset() >= endOffset - 1L) {
                        this.endOffsets.remove(partition);
                    }
                }
            });
        }
        if (this.stopWhenRead && this.endOffsets != null && this.endOffsets.isEmpty()) {
            log.info("Finished reading {}, processed {} messages", (Object)this.partitions, (Object)this.readCount);
            this.stop();
        }
        return recordList;
    }

    private void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        if (this.readFromBeginning) {
            log.debug("Seeking to beginning for {}", partitions);
            this.consumer.seekToBeginning(partitions);
        } else if (this.startOffsetProvider != null) {
            partitions.forEach(topicPartition -> {
                Long offset = this.startOffsetProvider.apply(topicPartition.topic());
                if (offset != null) {
                    log.debug("Seeking to offset {} for {}", (Object)offset, topicPartition);
                    this.consumer.seek(topicPartition, offset.longValue());
                } else {
                    log.info("No start offset provided for {}", topicPartition);
                }
            });
        }
        if (this.stopWhenRead) {
            log.debug("Getting end offsets for {}", partitions);
            this.endOffsets = this.consumer.endOffsets(partitions).entrySet().stream().filter(entry -> (Long)entry.getValue() > 0L).collect(Collectors.toMap(entry -> ((TopicPartition)entry.getKey()).partition(), Map.Entry::getValue));
        }
    }

    @Override
    public T decode(ConsumerRecord<String, byte[]> record) throws IOException {
        return (T)((TbQueueMsg)this.decoder.decode(new KafkaTbQueueMsg(record)));
    }

    @Override
    protected void doCommit() {
        if (this.groupId != null) {
            this.consumer.commitSync();
        }
    }

    @Override
    protected void doUnsubscribe() {
        if (this.consumer != null) {
            this.consumer.unsubscribe();
            this.consumer.close();
        }
        if (this.statsService != null) {
            this.statsService.unregisterClientGroup(this.groupId);
        }
    }

    @Override
    public boolean isLongPollingSupported() {
        return true;
    }

    @Generated
    public static <T extends TbQueueMsg> TbKafkaConsumerTemplateBuilder<T> builder() {
        return new TbKafkaConsumerTemplateBuilder();
    }

    @Generated
    public String getGroupId() {
        return this.groupId;
    }

    @Generated
    public void setStartOffsetProvider(Function<String, Long> startOffsetProvider) {
        this.startOffsetProvider = startOffsetProvider;
    }

    @Generated
    public static class TbKafkaConsumerTemplateBuilder<T extends TbQueueMsg> {
        @Generated
        private TbKafkaSettings settings;
        @Generated
        private TbKafkaDecoder<T> decoder;
        @Generated
        private String clientId;
        @Generated
        private String groupId;
        @Generated
        private String topic;
        @Generated
        private TbQueueAdmin admin;
        @Generated
        private TbKafkaConsumerStatsService statsService;
        @Generated
        private boolean readFromBeginning;
        @Generated
        private boolean stopWhenRead;

        @Generated
        TbKafkaConsumerTemplateBuilder() {
        }

        @Generated
        public TbKafkaConsumerTemplateBuilder<T> settings(TbKafkaSettings settings) {
            this.settings = settings;
            return this;
        }

        @Generated
        public TbKafkaConsumerTemplateBuilder<T> decoder(TbKafkaDecoder<T> decoder) {
            this.decoder = decoder;
            return this;
        }

        @Generated
        public TbKafkaConsumerTemplateBuilder<T> clientId(String clientId) {
            this.clientId = clientId;
            return this;
        }

        @Generated
        public TbKafkaConsumerTemplateBuilder<T> groupId(String groupId) {
            this.groupId = groupId;
            return this;
        }

        @Generated
        public TbKafkaConsumerTemplateBuilder<T> topic(String topic) {
            this.topic = topic;
            return this;
        }

        @Generated
        public TbKafkaConsumerTemplateBuilder<T> admin(TbQueueAdmin admin) {
            this.admin = admin;
            return this;
        }

        @Generated
        public TbKafkaConsumerTemplateBuilder<T> statsService(TbKafkaConsumerStatsService statsService) {
            this.statsService = statsService;
            return this;
        }

        @Generated
        public TbKafkaConsumerTemplateBuilder<T> readFromBeginning(boolean readFromBeginning) {
            this.readFromBeginning = readFromBeginning;
            return this;
        }

        @Generated
        public TbKafkaConsumerTemplateBuilder<T> stopWhenRead(boolean stopWhenRead) {
            this.stopWhenRead = stopWhenRead;
            return this;
        }

        @Generated
        public TbKafkaConsumerTemplate<T> build() {
            return new TbKafkaConsumerTemplate<T>(this.settings, this.decoder, this.clientId, this.groupId, this.topic, this.admin, this.statsService, this.readFromBeginning, this.stopWhenRead);
        }

        @Generated
        public String toString() {
            return "TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder(settings=" + String.valueOf(this.settings) + ", decoder=" + String.valueOf(this.decoder) + ", clientId=" + this.clientId + ", groupId=" + this.groupId + ", topic=" + this.topic + ", admin=" + String.valueOf(this.admin) + ", statsService=" + String.valueOf(this.statsService) + ", readFromBeginning=" + this.readFromBeginning + ", stopWhenRead=" + this.stopWhenRead + ")";
        }
    }
}

