package org.thingsboard.server.queue.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StopWatch;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;

/* loaded from: input_file:org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.class */
public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<ConsumerRecord<String, byte[]>, T> {
    private static final Logger log = LoggerFactory.getLogger(TbKafkaConsumerTemplate.class);
    private final TbKafkaAdmin admin;
    private final KafkaConsumer<String, byte[]> consumer;
    private final TbKafkaDecoder<T> decoder;
    private final TbKafkaConsumerStatsService statsService;
    private final String groupId;
    private Function<String, Long> startOffsetProvider;
    private final boolean readFromBeginning;
    private final boolean stopWhenRead;
    private int readCount;
    private Map<Integer, Long> endOffsets;

    /* loaded from: input_file:org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate$TbKafkaConsumerTemplateBuilder.class */
    public static class TbKafkaConsumerTemplateBuilder<T extends TbQueueMsg> {
        private TbKafkaSettings settings;
        private TbKafkaDecoder<T> decoder;
        private String clientId;
        private String groupId;
        private String topic;
        private TbQueueAdmin admin;
        private TbKafkaConsumerStatsService statsService;
        private boolean readFromBeginning;
        private boolean stopWhenRead;

        TbKafkaConsumerTemplateBuilder() {
        }

        public TbKafkaConsumerTemplateBuilder<T> settings(TbKafkaSettings tbKafkaSettings) {
            this.settings = tbKafkaSettings;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> decoder(TbKafkaDecoder<T> tbKafkaDecoder) {
            this.decoder = tbKafkaDecoder;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> clientId(String str) {
            this.clientId = str;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> groupId(String str) {
            this.groupId = str;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> topic(String str) {
            this.topic = str;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> admin(TbQueueAdmin tbQueueAdmin) {
            this.admin = tbQueueAdmin;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> statsService(TbKafkaConsumerStatsService tbKafkaConsumerStatsService) {
            this.statsService = tbKafkaConsumerStatsService;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> readFromBeginning(boolean z) {
            this.readFromBeginning = z;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> stopWhenRead(boolean z) {
            this.stopWhenRead = z;
            return this;
        }

        public TbKafkaConsumerTemplate<T> build() {
            return new TbKafkaConsumerTemplate<>(this.settings, this.decoder, this.clientId, this.groupId, this.topic, this.admin, this.statsService, this.readFromBeginning, this.stopWhenRead);
        }

        public String toString() {
            return "TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder(settings=" + String.valueOf(this.settings) + ", decoder=" + String.valueOf(this.decoder) + ", clientId=" + this.clientId + ", groupId=" + this.groupId + ", topic=" + this.topic + ", admin=" + String.valueOf(this.admin) + ", statsService=" + String.valueOf(this.statsService) + ", readFromBeginning=" + this.readFromBeginning + ", stopWhenRead=" + this.stopWhenRead + ")";
        }
    }

    private TbKafkaConsumerTemplate(TbKafkaSettings tbKafkaSettings, TbKafkaDecoder<T> tbKafkaDecoder, String str, String str2, String str3, TbQueueAdmin tbQueueAdmin, TbKafkaConsumerStatsService tbKafkaConsumerStatsService, boolean z, boolean z2) {
        super(str3);
        Properties consumerProps = tbKafkaSettings.toConsumerProps(str3);
        consumerProps.put("client.id", str);
        if (str2 != null) {
            consumerProps.put("group.id", str2);
        }
        this.statsService = tbKafkaConsumerStatsService;
        this.groupId = str2;
        if (tbKafkaConsumerStatsService != null) {
            tbKafkaConsumerStatsService.registerClientGroup(str2);
        }
        this.admin = (TbKafkaAdmin) tbQueueAdmin;
        this.consumer = new KafkaConsumer<>(consumerProps);
        this.decoder = tbKafkaDecoder;
        this.readFromBeginning = z;
        this.stopWhenRead = z2;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v22, types: [java.util.Map] */
    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doSubscribe(Set<TopicPartitionInfo> set) {
        HashMap hashMap;
        if (set == null) {
            hashMap = Collections.emptyMap();
        } else {
            hashMap = new HashMap();
            set.forEach(topicPartitionInfo -> {
                if (topicPartitionInfo.isUseInternalPartition()) {
                    ((List) hashMap.computeIfAbsent(topicPartitionInfo.getFullTopicName(), str -> {
                        return new ArrayList();
                    })).add((Integer) topicPartitionInfo.getPartition().get());
                } else {
                    hashMap.put(topicPartitionInfo.getFullTopicName(), null);
                }
            });
        }
        if (hashMap.isEmpty()) {
            log.info("unsubscribe due to empty topic list");
            this.consumer.unsubscribe();
            return;
        }
        Set keySet = hashMap.keySet();
        TbKafkaAdmin tbKafkaAdmin = this.admin;
        Objects.requireNonNull(tbKafkaAdmin);
        keySet.forEach(tbKafkaAdmin::createTopicIfNotExists);
        ArrayList arrayList = new ArrayList();
        hashMap.forEach((str, list) -> {
            if (list == null) {
                if (this.groupId != null) {
                    arrayList.add(str);
                    return;
                }
                list = IntStream.range(0, this.admin.getNumPartitions()).boxed().toList();
            }
            List list = list.stream().map(num -> {
                return new TopicPartition(str, num.intValue());
            }).toList();
            this.consumer.assign(list);
            onPartitionsAssigned(list);
        });
        if (arrayList.isEmpty()) {
            return;
        }
        if (this.readFromBeginning || this.stopWhenRead) {
            this.consumer.subscribe(arrayList, new ConsumerRebalanceListener() { // from class: org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    TbKafkaConsumerTemplate.log.debug("Handling onPartitionsAssigned {}", collection);
                    TbKafkaConsumerTemplate.this.onPartitionsAssigned(collection);
                }
            });
        } else {
            this.consumer.subscribe(arrayList);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v22, types: [java.util.List] */
    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected List<ConsumerRecord<String, byte[]>> doPoll(long j) {
        ArrayList arrayList;
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        log.trace("poll topic {} maxDuration {}", getTopic(), Long.valueOf(j));
        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(j));
        stopWatch.stop();
        log.trace("poll topic {} took {}ms", getTopic(), Long.valueOf(stopWatch.getTotalTimeMillis()));
        if (poll.isEmpty()) {
            arrayList = Collections.emptyList();
        } else {
            arrayList = new ArrayList(256);
            poll.forEach(consumerRecord -> {
                arrayList.add(consumerRecord);
                if (!this.stopWhenRead || this.endOffsets == null) {
                    return;
                }
                this.readCount++;
                int partition = consumerRecord.partition();
                Long l = this.endOffsets.get(Integer.valueOf(partition));
                if (l == null) {
                    log.debug("End offset not found for {} [{}]", consumerRecord.topic(), Integer.valueOf(partition));
                    return;
                }
                log.trace("[{}-{}] Got record offset {}, expected end offset: {}", new Object[]{consumerRecord.topic(), Integer.valueOf(partition), Long.valueOf(consumerRecord.offset()), Long.valueOf(l.longValue() - 1)});
                if (consumerRecord.offset() >= l.longValue() - 1) {
                    this.endOffsets.remove(Integer.valueOf(partition));
                }
            });
        }
        if (this.stopWhenRead && this.endOffsets != null && this.endOffsets.isEmpty()) {
            log.info("Finished reading {}, processed {} messages", this.partitions, Integer.valueOf(this.readCount));
            stop();
        }
        return arrayList;
    }

    private void onPartitionsAssigned(Collection<TopicPartition> collection) {
        if (this.readFromBeginning) {
            log.debug("Seeking to beginning for {}", collection);
            this.consumer.seekToBeginning(collection);
        } else if (this.startOffsetProvider != null) {
            collection.forEach(topicPartition -> {
                Long apply = this.startOffsetProvider.apply(topicPartition.topic());
                if (apply == null) {
                    log.info("No start offset provided for {}", topicPartition);
                } else {
                    log.debug("Seeking to offset {} for {}", apply, topicPartition);
                    this.consumer.seek(topicPartition, apply.longValue());
                }
            });
        }
        if (this.stopWhenRead) {
            log.debug("Getting end offsets for {}", collection);
            this.endOffsets = (Map) this.consumer.endOffsets(collection).entrySet().stream().filter(entry -> {
                return ((Long) entry.getValue()).longValue() > 0;
            }).collect(Collectors.toMap(entry2 -> {
                return Integer.valueOf(((TopicPartition) entry2.getKey()).partition());
            }, (v0) -> {
                return v0.getValue();
            }));
        }
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    public T decode(ConsumerRecord<String, byte[]> consumerRecord) throws IOException {
        return this.decoder.decode(new KafkaTbQueueMsg(consumerRecord));
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doCommit() {
        if (this.groupId != null) {
            this.consumer.commitSync();
        }
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doUnsubscribe() {
        if (this.consumer != null) {
            this.consumer.unsubscribe();
            this.consumer.close();
        }
        if (this.statsService != null) {
            this.statsService.unregisterClientGroup(this.groupId);
        }
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    public boolean isLongPollingSupported() {
        return true;
    }

    public static <T extends TbQueueMsg> TbKafkaConsumerTemplateBuilder<T> builder() {
        return new TbKafkaConsumerTemplateBuilder<>();
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setStartOffsetProvider(Function<String, Long> function) {
        this.startOffsetProvider = function;
    }
}
