/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.kafka;

import jakarta.annotation.PreDestroy;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.CachedValue;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.util.TbKafkaComponent;

@TbKafkaComponent
@Component
public class KafkaAdmin {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaAdmin.class);
    private final TbKafkaSettings settings;
    @Value(value="${queue.kafka.request.timeout.ms:30000}")
    private int requestTimeoutMs;
    @Value(value="${queue.kafka.topics_cache_ttl_ms:300000}")
    private int topicsCacheTtlMs;
    private final LazyInitializer<AdminClient> adminClient;
    private final CachedValue<Set<String>> topics;

    public KafkaAdmin(@Lazy TbKafkaSettings settings) {
        this.settings = settings;
        this.adminClient = ((LazyInitializer.Builder)LazyInitializer.builder().setInitializer(() -> AdminClient.create((Properties)settings.toAdminProps()))).get();
        this.topics = new CachedValue(() -> {
            ConcurrentHashMap.KeySetView topics = ConcurrentHashMap.newKeySet();
            topics.addAll(this.listTopics());
            return topics;
        }, (long)this.topicsCacheTtlMs);
    }

    public void createTopicIfNotExists(String topic, Map<String, String> properties, boolean force) {
        Set<String> topics = this.getTopics();
        if (!force && topics.contains(topic)) {
            log.trace("Topic {} already present in cache", (Object)topic);
            return;
        }
        log.debug("Creating topic {} with properties {}", (Object)topic, properties);
        String numPartitionsStr = properties.remove("partitions");
        int partitions = numPartitionsStr != null ? Integer.parseInt(numPartitionsStr) : 1;
        NewTopic newTopic = new NewTopic(topic, partitions, this.settings.getReplicationFactor()).configs(properties);
        try {
            this.getClient().createTopics(List.of(newTopic)).all().get((long)this.requestTimeoutMs, TimeUnit.MILLISECONDS);
            topics.add(topic);
        }
        catch (ExecutionException ee) {
            log.trace("Failed to create topic {} with properties {}", new Object[]{topic, properties, ee});
            if (!(ee.getCause() instanceof TopicExistsException)) {
                log.warn("[{}] Failed to create topic", (Object)topic, (Object)ee);
                throw new RuntimeException(ee);
            }
        }
        catch (Exception e) {
            log.warn("[{}] Failed to create topic", (Object)topic, (Object)e);
            throw new RuntimeException(e);
        }
    }

    public void deleteTopic(String topic) {
        log.debug("Deleting topic {}", (Object)topic);
        try {
            this.getClient().deleteTopics(List.of(topic)).all().get((long)this.requestTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            log.error("Failed to delete kafka topic [{}].", (Object)topic, (Object)e);
        }
    }

    private Set<String> getTopics() {
        return (Set)this.topics.get();
    }

    public Set<String> listTopics() {
        try {
            Set topics = (Set)this.getClient().listTopics().names().get((long)this.requestTimeoutMs, TimeUnit.MILLISECONDS);
            log.trace("Listed topics: {}", (Object)topics);
            return topics;
        }
        catch (Exception e) {
            log.error("Failed to get all topics.", (Throwable)e);
            return Collections.emptySet();
        }
    }

    public Map<String, Long> getTotalLagForGroupsBulk(Set<String> groupIds) {
        HashMap<String, Long> result = new HashMap<String, Long>();
        for (String groupId : groupIds) {
            result.put(groupId, this.getTotalConsumerGroupLag(groupId));
        }
        return result;
    }

    public long getTotalConsumerGroupLag(String groupId) {
        try {
            Map<TopicPartition, OffsetAndMetadata> committedOffsets = this.getConsumerGroupOffsets(groupId);
            if (committedOffsets.isEmpty()) {
                return 0L;
            }
            Map<TopicPartition, OffsetSpec> latestOffsetsSpec = committedOffsets.keySet().stream().collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
            Map endOffsets = (Map)this.getClient().listOffsets(latestOffsetsSpec).all().get((long)this.requestTimeoutMs, TimeUnit.MILLISECONDS);
            return committedOffsets.entrySet().stream().mapToLong(entry -> {
                TopicPartition tp = (TopicPartition)entry.getKey();
                long committed = ((OffsetAndMetadata)entry.getValue()).offset();
                long end = endOffsets.getOrDefault(tp, new ListOffsetsResult.ListOffsetsResultInfo(0L, 0L, Optional.empty())).offset();
                return end - committed;
            }).sum();
        }
        catch (Exception e) {
            log.error("Failed to get total lag for consumer group: {}", (Object)groupId, (Object)e);
            return 0L;
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> getConsumerGroupOffsets(String groupId) {
        return (Map)this.getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get((long)this.requestTimeoutMs, TimeUnit.MILLISECONDS);
    }

    public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) {
        try {
            log.info("syncOffsets [{}][{}][{}]", new Object[]{fatGroupId, newGroupId, partitionId});
            if (partitionId == null) {
                return;
            }
            this.syncOffsetsUnsafe(fatGroupId, newGroupId, "." + partitionId);
        }
        catch (Exception e) {
            log.warn("Failed to syncOffsets from {} to {} partitionId {}", new Object[]{fatGroupId, newGroupId, partitionId, e});
        }
    }

    public void syncOffsetsUnsafe(String fatGroupId, String newGroupId, String topicSuffix) throws ExecutionException, InterruptedException, TimeoutException {
        Map<TopicPartition, OffsetAndMetadata> oldOffsets = this.getConsumerGroupOffsets(fatGroupId);
        if (oldOffsets.isEmpty()) {
            return;
        }
        for (Map.Entry<TopicPartition, OffsetAndMetadata> consumerOffset : oldOffsets.entrySet()) {
            TopicPartition tp = consumerOffset.getKey();
            if (!tp.topic().endsWith(topicSuffix)) continue;
            OffsetAndMetadata om = consumerOffset.getValue();
            Map<TopicPartition, OffsetAndMetadata> newOffsets = this.getConsumerGroupOffsets(newGroupId);
            OffsetAndMetadata existingOffset = newOffsets.get(tp);
            if (existingOffset == null) {
                log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", new Object[]{tp, newGroupId, newOffsets});
            } else {
                if (existingOffset.offset() >= om.offset()) {
                    log.info("[{}] topic offset {} >= than old node group offset {}", new Object[]{tp, existingOffset.offset(), om.offset()});
                    break;
                }
                log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", new Object[]{tp, existingOffset.offset(), om.offset()});
            }
            this.getClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get((long)this.requestTimeoutMs, TimeUnit.MILLISECONDS);
            log.info("[{}] altered new consumer groupId {}", (Object)tp, (Object)newGroupId);
            break;
        }
    }

    public boolean isTopicEmpty(String topic) {
        return this.areAllTopicsEmpty(Set.of(topic));
    }

    public boolean areAllTopicsEmpty(Set<String> topics) {
        try {
            List<String> existingTopics = this.getTopics().stream().filter(topics::contains).toList();
            if (existingTopics.isEmpty()) {
                return true;
            }
            List allPartitions = ((Map)this.getClient().describeTopics(existingTopics).allTopicNames().get((long)this.requestTimeoutMs, TimeUnit.MILLISECONDS)).entrySet().stream().flatMap(entry -> {
                String topic = (String)entry.getKey();
                TopicDescription topicDescription = (TopicDescription)entry.getValue();
                return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()));
            }).toList();
            Map beginningOffsets = (Map)this.getClient().listOffsets(allPartitions.stream().collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get((long)this.requestTimeoutMs, TimeUnit.MILLISECONDS);
            Map endOffsets = (Map)this.getClient().listOffsets(allPartitions.stream().collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get((long)this.requestTimeoutMs, TimeUnit.MILLISECONDS);
            for (TopicPartition partition2 : allPartitions) {
                long endOffset;
                long beginningOffset = ((ListOffsetsResult.ListOffsetsResultInfo)beginningOffsets.get(partition2)).offset();
                if (beginningOffset == (endOffset = ((ListOffsetsResult.ListOffsetsResultInfo)endOffsets.get(partition2)).offset())) continue;
                log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", (Object)partition2.partition(), (Object)partition2.topic());
                return false;
            }
            return true;
        }
        catch (Exception e) {
            log.error("Failed to check if topics [{}] empty.", topics, (Object)e);
            return false;
        }
    }

    public void deleteConsumerGroup(String consumerGroupId) {
        try {
            this.getClient().deleteConsumerGroups(List.of(consumerGroupId)).all().get((long)this.requestTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            log.warn("Failed to delete consumer group {}", (Object)consumerGroupId, (Object)e);
        }
    }

    public AdminClient getClient() {
        try {
            return (AdminClient)this.adminClient.get();
        }
        catch (ConcurrentException e) {
            throw new RuntimeException("Failed to initialize Kafka admin client", e);
        }
    }

    @PreDestroy
    private void destroy() throws Exception {
        if (this.adminClient.isInitialized()) {
            ((AdminClient)this.adminClient.get()).close();
        }
    }
}

