package org.thingsboard.server.edqs.state;

import java.beans.ConstructorProperties;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsEventType;
import org.thingsboard.server.common.data.edqs.EdqsObject;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.edqs.processor.EdqsProcessor;
import org.thingsboard.server.edqs.processor.EdqsProducer;
import org.thingsboard.server.edqs.util.EdqsMapper;
import org.thingsboard.server.edqs.util.VersionsStore;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsExecutors;
import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;

@KafkaEdqsComponent
@Service
/* loaded from: input_file:org/thingsboard/server/edqs/state/KafkaEdqsStateService.class */
public class KafkaEdqsStateService implements EdqsStateService {
    private static final Logger log = LoggerFactory.getLogger(KafkaEdqsStateService.class);
    private final EdqsConfig config;
    private final EdqsPartitionService partitionService;
    private final KafkaEdqsQueueFactory queueFactory;
    private final DiscoveryService discoveryService;
    private final EdqsExecutors edqsExecutors;
    private final EdqsMapper mapper;
    private final TopicService topicService;

    @Autowired
    @Lazy
    private EdqsProcessor edqsProcessor;
    private PartitionedQueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> stateConsumer;
    private KafkaQueueStateService<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>, TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> queueStateService;
    private QueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> eventsToBackupConsumer;
    private EdqsProducer stateProducer;
    private VersionsStore versionsStore;
    private final AtomicInteger stateReadCount = new AtomicInteger();
    private final AtomicInteger eventsReadCount = new AtomicInteger();
    private boolean ready = false;

    @Override // org.thingsboard.server.edqs.state.EdqsStateService
    public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> partitionedQueueConsumerManager, List<PartitionedQueueConsumerManager<?>> list) {
        this.versionsStore = new VersionsStore(this.config.getVersionsCacheTtl());
        TbKafkaAdmin edqsQueueAdmin = this.queueFactory.getEdqsQueueAdmin();
        this.stateConsumer = PartitionedQueueConsumerManager.create().queueKey(new QueueKey(ServiceType.EDQS, this.config.getStateTopic())).topic(this.topicService.buildTopicName(this.config.getStateTopic())).pollInterval(this.config.getPollInterval()).msgPackProcessor((list2, tbQueueConsumer, queueConfig) -> {
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                TbProtoQueueMsg tbProtoQueueMsg = (TbProtoQueueMsg) it.next();
                try {
                    this.edqsProcessor.process(tbProtoQueueMsg.getValue(), false);
                    if (this.stateReadCount.incrementAndGet() % 100000 == 0) {
                        log.info("[state] Processed {} msgs", Integer.valueOf(this.stateReadCount.get()));
                    }
                } catch (Exception e) {
                    log.error("Failed to process message: {}", tbProtoQueueMsg, e);
                }
            }
            tbQueueConsumer.commit();
        }).consumerCreator((queueConfig2, topicPartitionInfo) -> {
            return this.queueFactory.createEdqsStateConsumer();
        }).queueAdmin(edqsQueueAdmin).consumerExecutor(this.edqsExecutors.getConsumersExecutor()).taskExecutor(this.edqsExecutors.getConsumerTaskExecutor()).scheduler(this.edqsExecutors.getScheduler()).uncaughtErrorHandler(this.edqsProcessor.getErrorHandler()).build();
        TbKafkaConsumerTemplate createEdqsEventsToBackupConsumer = this.queueFactory.createEdqsEventsToBackupConsumer();
        this.eventsToBackupConsumer = QueueConsumerManager.builder().name("edqs-events-to-backup-consumer").pollInterval(this.config.getPollInterval()).msgPackProcessor((list3, tbQueueConsumer2) -> {
            TransportProtos.ToEdqsMsg toEdqsMsg;
            Iterator it = list3.iterator();
            while (it.hasNext()) {
                TbProtoQueueMsg tbProtoQueueMsg = (TbProtoQueueMsg) it.next();
                if (tbQueueConsumer2.isStopped()) {
                    return;
                }
                try {
                    toEdqsMsg = (TransportProtos.ToEdqsMsg) tbProtoQueueMsg.getValue();
                    log.trace("Processing message: {}", toEdqsMsg);
                } catch (Throwable th) {
                    log.error("Failed to process message: {}", tbProtoQueueMsg, th);
                }
                if (toEdqsMsg.hasEventMsg()) {
                    TransportProtos.EdqsEventMsg eventMsg = toEdqsMsg.getEventMsg();
                    ObjectType valueOf = ObjectType.valueOf(eventMsg.getObjectType());
                    EdqsObject deserialize = this.mapper.deserialize(valueOf, eventMsg.getData().toByteArray(), true);
                    if (!eventMsg.hasVersion() || this.versionsStore.isNew(this.mapper.getKey(deserialize), Long.valueOf(eventMsg.getVersion()))) {
                        TenantId tenantId = getTenantId(toEdqsMsg);
                        log.trace("[{}] Saving to backup [{}] [{}] [{}]", new Object[]{tenantId, valueOf, EdqsEventType.valueOf(eventMsg.getEventType()), deserialize.stringKey()});
                        this.stateProducer.send(tenantId, valueOf, deserialize.stringKey(), toEdqsMsg);
                        int incrementAndGet = this.eventsReadCount.incrementAndGet();
                        if (incrementAndGet % 100000 == 0) {
                            log.info("[events-to-backup] Processed {} msgs", Integer.valueOf(incrementAndGet));
                        }
                    }
                }
            }
            tbQueueConsumer2.commit();
        }).consumerCreator(() -> {
            return createEdqsEventsToBackupConsumer;
        }).consumerExecutor(this.edqsExecutors.getConsumersExecutor()).threadPrefix("edqs-events-to-backup").build();
        this.stateProducer = EdqsProducer.builder().producer(this.queueFactory.createEdqsStateProducer()).partitionService(this.partitionService).build();
        this.queueStateService = KafkaQueueStateService.builder().eventConsumer(partitionedQueueConsumerManager).stateConsumer(this.stateConsumer).otherConsumers(list).eventsStartOffsetsProvider(() -> {
            HashMap hashMap = new HashMap();
            try {
                edqsQueueAdmin.getConsumerGroupOffsets(createEdqsEventsToBackupConsumer.getGroupId()).forEach((topicPartition, offsetAndMetadata) -> {
                    hashMap.put(topicPartition.topic(), Long.valueOf(offsetAndMetadata.offset()));
                });
            } catch (Exception e) {
                log.error("Failed to get consumer group offsets for {}", createEdqsEventsToBackupConsumer.getGroupId(), e);
            }
            return hashMap;
        }).build();
    }

    @Override // org.thingsboard.server.edqs.state.EdqsStateService
    public void process(Set<TopicPartitionInfo> set) {
        if (this.queueStateService.getPartitions().isEmpty()) {
            this.eventsToBackupConsumer.subscribe((Set) IntStream.range(0, this.config.getPartitions()).mapToObj(i -> {
                return TopicPartitionInfo.builder().topic(this.topicService.buildTopicName(this.config.getEventsTopic())).partition(Integer.valueOf(i)).build();
            }).collect(Collectors.toSet()));
            this.eventsToBackupConsumer.launch();
        }
        this.queueStateService.update(new QueueKey(ServiceType.EDQS), set, () -> {
            this.ready = true;
            this.discoveryService.setReady(true);
        });
    }

    @Override // org.thingsboard.server.edqs.state.EdqsStateService
    public void save(TenantId tenantId, ObjectType objectType, String str, EdqsEventType edqsEventType, TransportProtos.ToEdqsMsg toEdqsMsg) {
    }

    @Override // org.thingsboard.server.edqs.state.EdqsStateService
    public boolean isReady() {
        return this.ready;
    }

    private TenantId getTenantId(TransportProtos.ToEdqsMsg toEdqsMsg) {
        return TenantId.fromUUID(new UUID(toEdqsMsg.getTenantIdMSB(), toEdqsMsg.getTenantIdLSB()));
    }

    @Override // org.thingsboard.server.edqs.state.EdqsStateService
    public void stop() {
        this.stateConsumer.stop();
        this.stateConsumer.awaitStop();
        this.eventsToBackupConsumer.stop();
        this.stateProducer.stop();
    }

    @ConstructorProperties({"config", "partitionService", "queueFactory", "discoveryService", "edqsExecutors", "mapper", "topicService"})
    public KafkaEdqsStateService(EdqsConfig edqsConfig, EdqsPartitionService edqsPartitionService, KafkaEdqsQueueFactory kafkaEdqsQueueFactory, DiscoveryService discoveryService, EdqsExecutors edqsExecutors, EdqsMapper edqsMapper, TopicService topicService) {
        this.config = edqsConfig;
        this.partitionService = edqsPartitionService;
        this.queueFactory = kafkaEdqsQueueFactory;
        this.discoveryService = discoveryService;
        this.edqsExecutors = edqsExecutors;
        this.mapper = edqsMapper;
        this.topicService = topicService;
    }
}
