package org.thingsboard.server.edqs.processor;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.beans.ConstructorProperties;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ExceptionUtil;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsEvent;
import org.thingsboard.server.common.data.edqs.EdqsEventType;
import org.thingsboard.server.common.data.edqs.EdqsObject;
import org.thingsboard.server.common.data.edqs.query.EdqsRequest;
import org.thingsboard.server.common.data.edqs.query.EdqsResponse;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.edqs.repo.EdqsRepository;
import org.thingsboard.server.edqs.state.EdqsPartitionService;
import org.thingsboard.server.edqs.state.EdqsStateService;
import org.thingsboard.server.edqs.util.EdqsMapper;
import org.thingsboard.server.edqs.util.VersionsStore;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.common.PartitionedQueueResponseTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.edqs.EdqsComponent;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsExecutors;
import org.thingsboard.server.queue.edqs.EdqsQueueFactory;

@EdqsComponent
@Service
/* loaded from: input_file:org/thingsboard/server/edqs/processor/EdqsProcessor.class */
public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>, TbProtoQueueMsg<TransportProtos.FromEdqsMsg>> {
    private static final Logger log = LoggerFactory.getLogger(EdqsProcessor.class);
    private final EdqsQueueFactory queueFactory;
    private final EdqsMapper mapper;
    private final EdqsRepository repository;
    private final EdqsConfig config;
    private final EdqsExecutors edqsExecutors;
    private final EdqsPartitionService partitionService;
    private final DiscoveryService discoveryService;
    private final TopicService topicService;
    private final ConfigurableApplicationContext applicationContext;
    private final EdqsStateService stateService;
    private PartitionedQueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> eventConsumer;
    private PartitionedQueueResponseTemplate<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>, TbProtoQueueMsg<TransportProtos.FromEdqsMsg>> responseTemplate;
    private ListeningExecutorService requestExecutor;
    private VersionsStore versionsStore;
    private final AtomicInteger counter = new AtomicInteger();
    private Consumer<Throwable> errorHandler;

    @PostConstruct
    private void init() {
        this.errorHandler = th -> {
            if (th instanceof OutOfMemoryError) {
                log.error("OOM detected, shutting down");
                this.repository.clear();
                this.discoveryService.setReady(false);
                ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edqs-shutdown"));
                ConfigurableApplicationContext configurableApplicationContext = this.applicationContext;
                Objects.requireNonNull(configurableApplicationContext);
                newSingleThreadExecutor.execute(configurableApplicationContext::close);
            }
        };
        this.requestExecutor = this.edqsExecutors.getRequestExecutor();
        this.versionsStore = new VersionsStore(this.config.getVersionsCacheTtl());
        this.eventConsumer = PartitionedQueueConsumerManager.create().queueKey(new QueueKey(ServiceType.EDQS, this.config.getEventsTopic())).topic(this.topicService.buildTopicName(this.config.getEventsTopic())).pollInterval(this.config.getPollInterval()).msgPackProcessor((list, tbQueueConsumer, queueConfig) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                TbProtoQueueMsg tbProtoQueueMsg = (TbProtoQueueMsg) it.next();
                if (tbQueueConsumer.isStopped()) {
                    return;
                }
                try {
                    process((TransportProtos.ToEdqsMsg) tbProtoQueueMsg.getValue(), true);
                } catch (Exception e) {
                    log.error("Failed to process message: {}", tbProtoQueueMsg, e);
                }
            }
            tbQueueConsumer.commit();
        }).consumerCreator((queueConfig2, topicPartitionInfo) -> {
            return this.queueFactory.createEdqsEventsConsumer();
        }).queueAdmin(this.queueFactory.getEdqsQueueAdmin()).consumerExecutor(this.edqsExecutors.getConsumersExecutor()).taskExecutor(this.edqsExecutors.getConsumerTaskExecutor()).scheduler(this.edqsExecutors.getScheduler()).uncaughtErrorHandler(this.errorHandler).build();
        this.responseTemplate = this.queueFactory.createEdqsResponseTemplate(this);
        this.stateService.init(this.eventConsumer, List.of(this.responseTemplate.getRequestConsumer()));
    }

    @EventListener
    public void onPartitionsChange(PartitionChangeEvent partitionChangeEvent) {
        if (partitionChangeEvent.getServiceType() != ServiceType.EDQS) {
            return;
        }
        try {
            Set set = (Set) partitionChangeEvent.getNewPartitions().get(new QueueKey(ServiceType.EDQS));
            this.stateService.process(TopicPartitionInfo.withTopic(set, this.topicService.buildTopicName(this.config.getStateTopic())));
            Set set2 = (Set) partitionChangeEvent.getOldPartitions().get(new QueueKey(ServiceType.EDQS));
            if (CollectionsUtil.isNotEmpty(set2)) {
                Set set3 = (Set) Sets.difference(set2, set).stream().map(topicPartitionInfo -> {
                    return (Integer) topicPartitionInfo.getPartition().orElse(-1);
                }).collect(Collectors.toSet());
                if (set3.isEmpty()) {
                    return;
                }
                if (this.config.getPartitioningStrategy() == EdqsConfig.EdqsPartitioningStrategy.TENANT) {
                    this.repository.clearIf(tenantId -> {
                        return set3.contains(this.partitionService.resolvePartition(tenantId, null));
                    });
                } else {
                    log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", set3);
                }
            }
        } catch (Throwable th) {
            log.error("Failed to handle partition change event {}", partitionChangeEvent, th);
        }
    }

    public ListenableFuture<TbProtoQueueMsg<TransportProtos.FromEdqsMsg>> handle(TbProtoQueueMsg<TransportProtos.ToEdqsMsg> tbProtoQueueMsg) {
        TransportProtos.ToEdqsMsg value = tbProtoQueueMsg.getValue();
        return this.requestExecutor.submit(() -> {
            try {
                return new TbProtoQueueMsg(tbProtoQueueMsg.getKey(), TransportProtos.FromEdqsMsg.newBuilder().setResponseMsg(TransportProtos.EdqsResponseMsg.newBuilder().setValue(JacksonUtil.toString(processRequest(getTenantId(value), getCustomerId(value), (EdqsRequest) Objects.requireNonNull((EdqsRequest) JacksonUtil.fromString(value.getRequestMsg().getValue(), EdqsRequest.class))))).build()).build(), tbProtoQueueMsg.getHeaders());
            } catch (Exception e) {
                log.error("Failed to parse request msg: {}", value, e);
                throw e;
            }
        });
    }

    private EdqsResponse processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest edqsRequest) {
        EdqsResponse edqsResponse = new EdqsResponse();
        try {
            if (edqsRequest.getEntityDataQuery() != null) {
                edqsResponse.setEntityDataQueryResult(this.repository.findEntityDataByQuery(tenantId, customerId, edqsRequest.getEntityDataQuery(), false).mapData((v0) -> {
                    return v0.toOldEntityData();
                }));
            } else if (edqsRequest.getEntityCountQuery() != null) {
                edqsResponse.setEntityCountQueryResult(Long.valueOf(this.repository.countEntitiesByQuery(tenantId, customerId, edqsRequest.getEntityCountQuery(), tenantId.isSysTenantId())));
            }
            log.trace("[{}] Request: {}, response: {}", new Object[]{tenantId, edqsRequest, edqsResponse});
        } catch (Throwable th) {
            log.error("[{}] Failed to process request: {}", new Object[]{tenantId, edqsRequest, th});
            edqsResponse.setError(ExceptionUtil.getMessage(th));
        }
        return edqsResponse;
    }

    public void process(TransportProtos.ToEdqsMsg toEdqsMsg, boolean z) {
        log.trace("Processing message: {}", toEdqsMsg);
        if (toEdqsMsg.hasEventMsg()) {
            TransportProtos.EdqsEventMsg eventMsg = toEdqsMsg.getEventMsg();
            TenantId tenantId = getTenantId(toEdqsMsg);
            ObjectType valueOf = ObjectType.valueOf(eventMsg.getObjectType());
            EdqsEventType valueOf2 = EdqsEventType.valueOf(eventMsg.getEventType());
            Long valueOf3 = eventMsg.hasVersion() ? Long.valueOf(eventMsg.getVersion()) : null;
            EdqsObject deserialize = this.mapper.deserialize(valueOf, eventMsg.getData().toByteArray(), false);
            if (valueOf3 != null) {
                if (!this.versionsStore.isNew(this.mapper.getKey(deserialize), valueOf3)) {
                    return;
                }
            } else if (!ObjectType.unversionedTypes.contains(valueOf)) {
                log.warn("[{}] {} doesn't have version: {}", new Object[]{tenantId, valueOf, deserialize});
            }
            if (z) {
                this.stateService.save(tenantId, valueOf, deserialize.stringKey(), valueOf2, toEdqsMsg);
            }
            int incrementAndGet = this.counter.incrementAndGet();
            if (incrementAndGet % 100000 == 0) {
                log.info("Processed {} events", Integer.valueOf(incrementAndGet));
            }
            EdqsEvent build = EdqsEvent.builder().tenantId(tenantId).objectType(valueOf).eventType(valueOf2).object(deserialize).build();
            log.debug("Processing event: {}", build);
            this.repository.processEvent(build);
        }
    }

    private TenantId getTenantId(TransportProtos.ToEdqsMsg toEdqsMsg) {
        return TenantId.fromUUID(new UUID(toEdqsMsg.getTenantIdMSB(), toEdqsMsg.getTenantIdLSB()));
    }

    private CustomerId getCustomerId(TransportProtos.ToEdqsMsg toEdqsMsg) {
        if (toEdqsMsg.getCustomerIdMSB() == 0 || toEdqsMsg.getCustomerIdLSB() == 0) {
            return null;
        }
        return new CustomerId(new UUID(toEdqsMsg.getCustomerIdMSB(), toEdqsMsg.getCustomerIdLSB()));
    }

    @PreDestroy
    public void destroy() throws InterruptedException {
        this.eventConsumer.stop();
        this.eventConsumer.awaitStop();
        this.responseTemplate.stop();
        this.stateService.stop();
    }

    @ConstructorProperties({"queueFactory", "mapper", "repository", "config", "edqsExecutors", "partitionService", "discoveryService", "topicService", "applicationContext", "stateService"})
    public EdqsProcessor(EdqsQueueFactory edqsQueueFactory, EdqsMapper edqsMapper, EdqsRepository edqsRepository, EdqsConfig edqsConfig, EdqsExecutors edqsExecutors, EdqsPartitionService edqsPartitionService, DiscoveryService discoveryService, TopicService topicService, ConfigurableApplicationContext configurableApplicationContext, EdqsStateService edqsStateService) {
        this.queueFactory = edqsQueueFactory;
        this.mapper = edqsMapper;
        this.repository = edqsRepository;
        this.config = edqsConfig;
        this.edqsExecutors = edqsExecutors;
        this.partitionService = edqsPartitionService;
        this.discoveryService = discoveryService;
        this.topicService = topicService;
        this.applicationContext = configurableApplicationContext;
        this.stateService = edqsStateService;
    }

    public Consumer<Throwable> getErrorHandler() {
        return this.errorHandler;
    }
}
