package org.thingsboard.server.service.queue;

import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;

@Service
@TbRuleEngineComponent
/* loaded from: input_file:org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.class */
public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBasedConsumerService<TransportProtos.ToCalculatedFieldNotificationMsg> implements TbCalculatedFieldConsumerService {

    @Value("${queue.calculated_fields.poll_interval:25}")
    private long pollInterval;

    @Value("${queue.calculated_fields.pack_processing_timeout:60000}")
    private long packProcessingTimeout;
    private final TbRuleEngineQueueFactory queueFactory;
    private final CalculatedFieldStateService stateService;

    public DefaultTbCalculatedFieldConsumerService(TbRuleEngineQueueFactory tbRuleEngineQueueFactory, ActorSystemContext actorSystemContext, TbDeviceProfileCache tbDeviceProfileCache, TbAssetProfileCache tbAssetProfileCache, TbTenantProfileCache tbTenantProfileCache, TbApiUsageStateService tbApiUsageStateService, PartitionService partitionService, ApplicationEventPublisher applicationEventPublisher, JwtSettingsService jwtSettingsService, CalculatedFieldCache calculatedFieldCache, CalculatedFieldStateService calculatedFieldStateService) {
        super(actorSystemContext, tbTenantProfileCache, tbDeviceProfileCache, tbAssetProfileCache, calculatedFieldCache, tbApiUsageStateService, partitionService, applicationEventPublisher, jwtSettingsService);
        this.queueFactory = tbRuleEngineQueueFactory;
        this.stateService = calculatedFieldStateService;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService
    protected void onStartUp() {
        QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, "CalculatedFields");
        this.stateService.init(PartitionedQueueConsumerManager.create().queueKey(queueKey).topic(this.partitionService.getTopic(queueKey)).pollInterval(this.pollInterval).msgPackProcessor(this::processMsgs).consumerCreator((queueConfig, topicPartitionInfo) -> {
            return this.queueFactory.createToCalculatedFieldMsgConsumer(topicPartitionInfo);
        }).queueAdmin(this.queueFactory.getCalculatedFieldQueueAdmin()).consumerExecutor(this.consumersExecutor).scheduler(this.scheduler).taskExecutor(this.mgmtExecutor).build());
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    @PreDestroy
    public void destroy() {
        super.destroy();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    public void startConsumers() {
        super.startConsumers();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService
    protected void onPartitionChangeEvent(PartitionChangeEvent partitionChangeEvent) {
        try {
            partitionChangeEvent.getNewPartitions().forEach((queueKey, set) -> {
                if ("CalculatedFields".equals(queueKey.getQueueName())) {
                    this.stateService.restore(queueKey, set);
                }
            });
            this.actorContext.tell(new CalculatedFieldPartitionChangeMsg());
        } catch (Throwable th) {
            this.log.error("Failed to process partition change event: {}", partitionChangeEvent, th);
        }
    }

    private void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> list, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> tbQueueConsumer, QueueConfig queueConfig) throws Exception {
        List list2 = list.stream().map(tbProtoQueueMsg -> {
            return new IdMsgPair(UUID.randomUUID(), tbProtoQueueMsg);
        }).toList();
        ConcurrentMap concurrentMap = (ConcurrentMap) list2.stream().collect(Collectors.toConcurrentMap((v0) -> {
            return v0.getUuid();
        }, (v0) -> {
            return v0.getMsg();
        }));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TbPackProcessingContext tbPackProcessingContext = new TbPackProcessingContext(countDownLatch, concurrentMap, new ConcurrentHashMap());
        PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
        Future<?> submit = this.consumersExecutor.submit(() -> {
            list2.forEach(idMsgPair -> {
                UUID uuid = idMsgPair.getUuid();
                TbProtoQueueMsg msg = idMsgPair.getMsg();
                this.log.trace("[{}] Creating main callback for message: {}", uuid, msg.getValue());
                TbPackCallback tbPackCallback = new TbPackCallback(uuid, tbPackProcessingContext);
                try {
                    TransportProtos.ToCalculatedFieldMsg value = msg.getValue();
                    pendingMsgHolder.setMsg(value);
                    if (value.hasTelemetryMsg()) {
                        this.log.trace("[{}] Forwarding regular telemetry message for processing {}", uuid, value.getTelemetryMsg());
                        forwardToActorSystem(value.getTelemetryMsg(), tbPackCallback);
                    } else if (value.hasLinkedTelemetryMsg()) {
                        forwardToActorSystem(value.getLinkedTelemetryMsg(), tbPackCallback);
                    }
                } catch (Throwable th) {
                    this.log.warn("[{}] Failed to process message: {}", new Object[]{uuid, msg, th});
                    tbPackCallback.onFailure(th);
                }
            });
        });
        if (!countDownLatch.await(this.packProcessingTimeout, TimeUnit.MILLISECONDS)) {
            if (!submit.isDone()) {
                submit.cancel(true);
                this.log.info("Timeout to process message: {}", pendingMsgHolder.getMsg());
            }
            tbPackProcessingContext.getAckMap().forEach((uuid, tbProtoQueueMsg2) -> {
                this.log.warn("[{}] Timeout to process message: {}", uuid, tbProtoQueueMsg2.getValue());
            });
            tbPackProcessingContext.getFailedMap().forEach((uuid2, tbProtoQueueMsg3) -> {
                this.log.warn("[{}] Failed to process message: {}", uuid2, tbProtoQueueMsg3.getValue());
            });
        }
        tbQueueConsumer.commit();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected ServiceType getServiceType() {
        return ServiceType.TB_RULE_ENGINE;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService
    protected String getPrefix() {
        return "tb-cf";
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected long getNotificationPollDuration() {
        return this.pollInterval;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected long getNotificationPackProcessingTimeout() {
        return this.packProcessingTimeout;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected int getMgmtThreadPoolSize() {
        return Math.max(Runtime.getRuntime().availableProcessors(), 4);
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldNotificationMsg>> createNotificationsConsumer() {
        return this.queueFactory.createToCalculatedFieldNotificationMsgConsumer();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected void handleNotification(UUID uuid, TbProtoQueueMsg<TransportProtos.ToCalculatedFieldNotificationMsg> tbProtoQueueMsg, TbCallback tbCallback) {
        TransportProtos.ToCalculatedFieldNotificationMsg value = tbProtoQueueMsg.getValue();
        if (value.hasLinkedTelemetryMsg()) {
            forwardToActorSystem(value.getLinkedTelemetryMsg(), tbCallback);
        }
    }

    @EventListener
    public void handleComponentLifecycleEvent(ComponentLifecycleMsg componentLifecycleMsg) {
        if (componentLifecycleMsg.getEntityId().getEntityType() == EntityType.TENANT && componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) {
            Set<TopicPartitionInfo> partitions = this.stateService.getPartitions();
            if (CollectionUtils.isEmpty(partitions)) {
                return;
            }
            this.stateService.delete((Set) partitions.stream().filter(topicPartitionInfo -> {
                return topicPartitionInfo.getTenantId().isPresent() && ((TenantId) topicPartitionInfo.getTenantId().get()).equals(componentLifecycleMsg.getTenantId());
            }).collect(Collectors.toSet()));
        }
    }

    private void forwardToActorSystem(TransportProtos.CalculatedFieldTelemetryMsgProto calculatedFieldTelemetryMsgProto, TbCallback tbCallback) {
        this.actorContext.tell(new CalculatedFieldTelemetryMsg(toTenantId(calculatedFieldTelemetryMsgProto.getTenantIdMSB(), calculatedFieldTelemetryMsgProto.getTenantIdLSB()), EntityIdFactory.getByTypeAndUuid(calculatedFieldTelemetryMsgProto.getEntityType(), new UUID(calculatedFieldTelemetryMsgProto.getEntityIdMSB(), calculatedFieldTelemetryMsgProto.getEntityIdLSB())), calculatedFieldTelemetryMsgProto, tbCallback));
    }

    private void forwardToActorSystem(TransportProtos.CalculatedFieldLinkedTelemetryMsgProto calculatedFieldLinkedTelemetryMsgProto, TbCallback tbCallback) {
        TransportProtos.CalculatedFieldTelemetryMsgProto msg = calculatedFieldLinkedTelemetryMsgProto.getMsg();
        this.actorContext.tell(new CalculatedFieldLinkedTelemetryMsg(toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()), EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())), calculatedFieldLinkedTelemetryMsgProto, tbCallback));
    }

    private TenantId toTenantId(long j, long j2) {
        return TenantId.fromUUID(new UUID(j, j2));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    public void stopConsumers() {
        super.stopConsumers();
        this.stateService.stop();
    }
}
