/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.kafka;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatisticConfig;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.util.TbKafkaComponent;

@Component
@TbKafkaComponent
public class TbKafkaConsumerStatsService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TbKafkaConsumerStatsService.class);
    private final Set<String> monitoredGroups = ConcurrentHashMap.newKeySet();
    private final TbKafkaSettings kafkaSettings;
    private final KafkaAdmin kafkaAdmin;
    private final TbKafkaConsumerStatisticConfig statsConfig;
    private Consumer<String, byte[]> consumer;
    private ScheduledExecutorService statsPrintScheduler;

    @PostConstruct
    public void init() {
        if (!this.statsConfig.getEnabled().booleanValue()) {
            return;
        }
        this.statsPrintScheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor((String)"kafka-consumer-stats");
        Properties consumerProps = this.kafkaSettings.toConsumerProps(null);
        consumerProps.put("client.id", "consumer-stats-loader-client");
        consumerProps.put("group.id", "consumer-stats-loader-client-group");
        this.consumer = new KafkaConsumer(consumerProps);
        this.startLogScheduling();
    }

    private void startLogScheduling() {
        Duration timeoutDuration = Duration.ofMillis(this.statsConfig.getKafkaResponseTimeoutMs());
        this.statsPrintScheduler.scheduleWithFixedDelay(() -> {
            if (!this.isStatsPrintRequired()) {
                return;
            }
            for (String groupId : this.monitoredGroups) {
                try {
                    Map endOffsets;
                    Map groupOffsets = (Map)this.kafkaSettings.getAdmin().getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(this.statsConfig.getKafkaResponseTimeoutMs().longValue(), TimeUnit.MILLISECONDS);
                    List<GroupTopicStats> lagTopicsStats = this.getTopicsStatsWithLag(groupOffsets, endOffsets = this.consumer.endOffsets(groupOffsets.keySet(), timeoutDuration));
                    if (lagTopicsStats.isEmpty()) continue;
                    StringBuilder builder = new StringBuilder();
                    for (int i = 0; i < lagTopicsStats.size(); ++i) {
                        builder.append(lagTopicsStats.get(i).toString());
                        if (i == lagTopicsStats.size() - 1) continue;
                        builder.append(", ");
                    }
                    log.info("[{}] Topic partitions with lag: [{}].", (Object)groupId, (Object)builder.toString());
                }
                catch (Exception e) {
                    log.warn("[{}] Failed to get consumer group stats", (Object)groupId, (Object)e);
                }
            }
        }, this.statsConfig.getPrintIntervalMs(), this.statsConfig.getPrintIntervalMs(), TimeUnit.MILLISECONDS);
    }

    private boolean isStatsPrintRequired() {
        return log.isInfoEnabled();
    }

    private List<GroupTopicStats> getTopicsStatsWithLag(Map<TopicPartition, OffsetAndMetadata> groupOffsets, Map<TopicPartition, Long> endOffsets) {
        ArrayList<GroupTopicStats> consumerGroupStats = new ArrayList<GroupTopicStats>();
        for (TopicPartition topicPartition : groupOffsets.keySet()) {
            long committedOffset;
            long endOffset = endOffsets.get(topicPartition);
            long lag = endOffset - (committedOffset = groupOffsets.get(topicPartition).offset());
            if (lag == 0L) continue;
            GroupTopicStats groupTopicStats = GroupTopicStats.builder().topic(topicPartition.topic()).partition(topicPartition.partition()).committedOffset(committedOffset).endOffset(endOffset).lag(lag).build();
            consumerGroupStats.add(groupTopicStats);
        }
        return consumerGroupStats;
    }

    public void registerClientGroup(String groupId) {
        if (this.statsConfig.getEnabled().booleanValue() && !StringUtils.isEmpty((String)groupId)) {
            this.monitoredGroups.add(groupId);
        }
    }

    public void unregisterClientGroup(String groupId) {
        if (this.statsConfig.getEnabled().booleanValue() && !StringUtils.isEmpty((String)groupId)) {
            this.monitoredGroups.remove(groupId);
        }
    }

    @PreDestroy
    public void destroy() {
        if (this.statsPrintScheduler != null) {
            this.statsPrintScheduler.shutdownNow();
        }
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    @ConstructorProperties(value={"kafkaSettings", "kafkaAdmin", "statsConfig"})
    @Generated
    public TbKafkaConsumerStatsService(TbKafkaSettings kafkaSettings, KafkaAdmin kafkaAdmin, TbKafkaConsumerStatisticConfig statsConfig) {
        this.kafkaSettings = kafkaSettings;
        this.kafkaAdmin = kafkaAdmin;
        this.statsConfig = statsConfig;
    }

    private static class GroupTopicStats {
        private String topic;
        private int partition;
        private long committedOffset;
        private long endOffset;
        private long lag;

        public String toString() {
            return "[topic=[" + this.topic + "], partition=[" + this.partition + "], committedOffset=[" + this.committedOffset + "], endOffset=[" + this.endOffset + "], lag=[" + this.lag + "]]";
        }

        @ConstructorProperties(value={"topic", "partition", "committedOffset", "endOffset", "lag"})
        @Generated
        GroupTopicStats(String topic, int partition, long committedOffset, long endOffset, long lag) {
            this.topic = topic;
            this.partition = partition;
            this.committedOffset = committedOffset;
            this.endOffset = endOffset;
            this.lag = lag;
        }

        @Generated
        public static GroupTopicStatsBuilder builder() {
            return new GroupTopicStatsBuilder();
        }

        @Generated
        public String getTopic() {
            return this.topic;
        }

        @Generated
        public int getPartition() {
            return this.partition;
        }

        @Generated
        public long getCommittedOffset() {
            return this.committedOffset;
        }

        @Generated
        public long getEndOffset() {
            return this.endOffset;
        }

        @Generated
        public long getLag() {
            return this.lag;
        }

        @Generated
        public void setTopic(String topic) {
            this.topic = topic;
        }

        @Generated
        public void setPartition(int partition) {
            this.partition = partition;
        }

        @Generated
        public void setCommittedOffset(long committedOffset) {
            this.committedOffset = committedOffset;
        }

        @Generated
        public void setEndOffset(long endOffset) {
            this.endOffset = endOffset;
        }

        @Generated
        public void setLag(long lag) {
            this.lag = lag;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof GroupTopicStats)) {
                return false;
            }
            GroupTopicStats other = (GroupTopicStats)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getPartition() != other.getPartition()) {
                return false;
            }
            if (this.getCommittedOffset() != other.getCommittedOffset()) {
                return false;
            }
            if (this.getEndOffset() != other.getEndOffset()) {
                return false;
            }
            if (this.getLag() != other.getLag()) {
                return false;
            }
            String this$topic = this.getTopic();
            String other$topic = other.getTopic();
            return !(this$topic == null ? other$topic != null : !this$topic.equals(other$topic));
        }

        @Generated
        protected boolean canEqual(Object other) {
            return other instanceof GroupTopicStats;
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getPartition();
            long $committedOffset = this.getCommittedOffset();
            result = result * 59 + (int)($committedOffset >>> 32 ^ $committedOffset);
            long $endOffset = this.getEndOffset();
            result = result * 59 + (int)($endOffset >>> 32 ^ $endOffset);
            long $lag = this.getLag();
            result = result * 59 + (int)($lag >>> 32 ^ $lag);
            String $topic = this.getTopic();
            result = result * 59 + ($topic == null ? 43 : $topic.hashCode());
            return result;
        }

        @Generated
        public static class GroupTopicStatsBuilder {
            @Generated
            private String topic;
            @Generated
            private int partition;
            @Generated
            private long committedOffset;
            @Generated
            private long endOffset;
            @Generated
            private long lag;

            @Generated
            GroupTopicStatsBuilder() {
            }

            @Generated
            public GroupTopicStatsBuilder topic(String topic) {
                this.topic = topic;
                return this;
            }

            @Generated
            public GroupTopicStatsBuilder partition(int partition) {
                this.partition = partition;
                return this;
            }

            @Generated
            public GroupTopicStatsBuilder committedOffset(long committedOffset) {
                this.committedOffset = committedOffset;
                return this;
            }

            @Generated
            public GroupTopicStatsBuilder endOffset(long endOffset) {
                this.endOffset = endOffset;
                return this;
            }

            @Generated
            public GroupTopicStatsBuilder lag(long lag) {
                this.lag = lag;
                return this;
            }

            @Generated
            public GroupTopicStats build() {
                return new GroupTopicStats(this.topic, this.partition, this.committedOffset, this.endOffset, this.lag);
            }

            @Generated
            public String toString() {
                return "TbKafkaConsumerStatsService.GroupTopicStats.GroupTopicStatsBuilder(topic=" + this.topic + ", partition=" + this.partition + ", committedOffset=" + this.committedOffset + ", endOffset=" + this.endOffset + ", lag=" + this.lag + ")";
            }
        }
    }
}

