package org.thingsboard.server.service.ttl;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.state.DefaultDeviceStateService;

@TbCoreComponent
@Service
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && ${edges.enabled:true} && ${sql.ttl.edge_events.edge_events_ttl:0} > 0")
/* loaded from: input_file:org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.class */
public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
    private static final Logger log = LoggerFactory.getLogger(KafkaEdgeTopicsCleanUpService.class);
    private final TopicService topicService;
    private final TenantService tenantService;
    private final EdgeService edgeService;
    private final AttributesService attributesService;
    private final TbKafkaAdmin kafkaAdmin;

    @Value("${sql.ttl.edge_events.edge_events_ttl:2628000}")
    private long ttlSeconds;

    @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}")
    private String tbEdgeEventNotificationsTopic;

    public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService, TenantService tenantService, AttributesService attributesService, TopicService topicService, TbKafkaSettings tbKafkaSettings, TbKafkaTopicConfigs tbKafkaTopicConfigs) {
        super(partitionService);
        this.topicService = topicService;
        this.tenantService = tenantService;
        this.edgeService = edgeService;
        this.attributesService = attributesService;
        this.kafkaAdmin = new TbKafkaAdmin(tbKafkaSettings, tbKafkaTopicConfigs.getEdgeEventConfigs());
    }

    @Scheduled(initialDelayString = EdgeEventsCleanUpService.RANDOM_DELAY_INTERVAL_MS_EXPRESSION, fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
    public void cleanUp() {
        Set allTopics;
        if (!isSystemTenantPartitionMine() || (allTopics = this.kafkaAdmin.getAllTopics()) == null || allTopics.isEmpty()) {
            return;
        }
        String buildTopicName = this.topicService.buildTopicName(this.tbEdgeEventNotificationsTopic);
        List<String> list = allTopics.stream().filter(str -> {
            return str.startsWith(buildTopicName);
        }).toList();
        if (list.isEmpty()) {
            log.debug("No matching topics found with prefix [{}]. Skipping cleanup.", buildTopicName);
            return;
        }
        Map<TenantId, List<EdgeId>> extractTenantAndEdgeIds = extractTenantAndEdgeIds(list, buildTopicName);
        long currentTimeMillis = System.currentTimeMillis();
        long millis = TimeUnit.SECONDS.toMillis(this.ttlSeconds);
        extractTenantAndEdgeIds.forEach((tenantId, list2) -> {
            processTenantCleanUp(tenantId, list2, millis, currentTimeMillis);
        });
    }

    private void processTenantCleanUp(TenantId tenantId, List<EdgeId> list, long j, long j2) {
        if (!this.tenantService.tenantExists(tenantId)) {
            Iterator<EdgeId> it = list.iterator();
            while (it.hasNext()) {
                deleteTopicAndConsumerGroup(this.topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, it.next()).getTopic());
            }
            log.info("[{}] Removed topics for not existing tenant and edges {}", tenantId, list);
            return;
        }
        for (EdgeId edgeId : list) {
            try {
                ((Optional) this.attributesService.find(tenantId, edgeId, AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.LAST_CONNECT_TIME).get()).flatMap((v0) -> {
                    return v0.getLongValue();
                }).filter(l -> {
                    return isTopicExpired(l.longValue(), j, j2);
                }).ifPresentOrElse(l2 -> {
                    String topic = this.topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
                    if (this.kafkaAdmin.isTopicEmpty(topic)) {
                        deleteTopicAndConsumerGroup(topic);
                        log.info("[{}] Removed outdated topic {} for edge {} older than {}", new Object[]{tenantId, topic, edgeId, Date.from(Instant.ofEpochMilli(j2 - j))});
                    }
                }, () -> {
                    if (this.edgeService.findEdgeById(tenantId, edgeId) == null) {
                        String topic = this.topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
                        deleteTopicAndConsumerGroup(topic);
                        log.info("[{}] Removed topic {} for deleted edge {}", new Object[]{tenantId, topic, edgeId});
                    }
                });
            } catch (Exception e) {
                log.error("[{}] Failed to delete topic for edge {}", new Object[]{tenantId, edgeId, e});
            }
        }
    }

    private void deleteTopicAndConsumerGroup(String str) {
        this.kafkaAdmin.deleteTopic(str);
        this.kafkaAdmin.deleteConsumerGroup(str);
    }

    private boolean isTopicExpired(long j, long j2, long j3) {
        return j + j2 < j3;
    }

    private Map<TenantId, List<EdgeId>> extractTenantAndEdgeIds(List<String> list, String str) {
        HashMap hashMap = new HashMap();
        for (String str2 : list) {
            try {
                String[] split = str2.substring(str.length() + 1).split("\\.");
                TenantId fromUUID = TenantId.fromUUID(UUID.fromString(split[0]));
                ((List) hashMap.computeIfAbsent(fromUUID, tenantId -> {
                    return new ArrayList();
                })).add(new EdgeId(UUID.fromString(split[1])));
            } catch (Exception e) {
                log.warn("Failed to extract TenantId and EdgeId from topic [{}]", str2, e);
            }
        }
        return hashMap;
    }
}
