package org.thingsboard.server.service.queue;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService;
import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineConsumerContext;
import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineQueueConsumerManager;
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;

@Service
@TbRuleEngineComponent
/* loaded from: input_file:org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.class */
public class DefaultTbRuleEngineConsumerService extends AbstractPartitionBasedConsumerService<TransportProtos.ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService {
    private final TbRuleEngineConsumerContext ctx;
    private final QueueService queueService;
    private final TbRuleEngineDeviceRpcService tbDeviceRpcService;
    private final ConcurrentMap<QueueKey, TbRuleEngineQueueConsumerManager> consumers;

    public DefaultTbRuleEngineConsumerService(TbRuleEngineConsumerContext tbRuleEngineConsumerContext, ActorSystemContext actorSystemContext, TbRuleEngineDeviceRpcService tbRuleEngineDeviceRpcService, QueueService queueService, TbDeviceProfileCache tbDeviceProfileCache, TbAssetProfileCache tbAssetProfileCache, TbTenantProfileCache tbTenantProfileCache, TbApiUsageStateService tbApiUsageStateService, PartitionService partitionService, ApplicationEventPublisher applicationEventPublisher, JwtSettingsService jwtSettingsService, CalculatedFieldCache calculatedFieldCache) {
        super(actorSystemContext, tbTenantProfileCache, tbDeviceProfileCache, tbAssetProfileCache, calculatedFieldCache, tbApiUsageStateService, partitionService, applicationEventPublisher, jwtSettingsService);
        this.consumers = new ConcurrentHashMap();
        this.ctx = tbRuleEngineConsumerContext;
        this.tbDeviceRpcService = tbRuleEngineDeviceRpcService;
        this.queueService = queueService;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService
    protected void onStartUp() {
        for (Queue queue : this.queueService.findAllQueues()) {
            if (this.partitionService.isManagedByCurrentService(queue.getTenantId())) {
                createConsumer(new QueueKey(ServiceType.TB_RULE_ENGINE, queue), queue);
            }
        }
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService
    protected void onPartitionChangeEvent(PartitionChangeEvent partitionChangeEvent) {
        partitionChangeEvent.getNewPartitions().forEach((queueKey, set) -> {
            TbRuleEngineQueueConsumerManager orElseGet;
            if ("CalculatedFields".equals(queueKey.getQueueName()) || "CalculatedFieldStates".equals(queueKey.getQueueName()) || !this.partitionService.isManagedByCurrentService(queueKey.getTenantId()) || (orElseGet = getConsumer(queueKey).orElseGet(() -> {
                Queue findQueueByTenantIdAndName = this.queueService.findQueueByTenantIdAndName(queueKey.getTenantId(), queueKey.getQueueName());
                if (findQueueByTenantIdAndName != null) {
                    return createConsumer(queueKey, findQueueByTenantIdAndName);
                }
                if (set.isEmpty()) {
                    return null;
                }
                this.log.error("[{}] Queue configuration is missing", queueKey, new RuntimeException("stacktrace"));
                return null;
            })) == null) {
                return;
            }
            orElseGet.update(set);
        });
        ((Map) this.consumers.keySet().stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getTenantId();
        }))).forEach((tenantId, list) -> {
            if (this.partitionService.isManagedByCurrentService(tenantId)) {
                return;
            }
            list.forEach(queueKey2 -> {
                removeConsumer(queueKey2).ifPresent((v0) -> {
                    v0.stop();
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    public void stopConsumers() {
        super.stopConsumers();
        this.consumers.values().forEach((v0) -> {
            v0.stop();
        });
        this.consumers.values().forEach((v0) -> {
            v0.awaitStop();
        });
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected ServiceType getServiceType() {
        return ServiceType.TB_RULE_ENGINE;
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService
    protected String getPrefix() {
        return "tb-rule-engine";
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected long getNotificationPollDuration() {
        return this.ctx.getPollDuration();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected long getNotificationPackProcessingTimeout() {
        return this.ctx.getPackProcessingTimeout();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected int getMgmtThreadPoolSize() {
        return this.ctx.getMgmtThreadPoolSize();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createNotificationsConsumer() {
        return this.ctx.getQueueFactory().createToRuleEngineNotificationsMsgConsumer();
    }

    @Override // org.thingsboard.server.service.queue.processing.AbstractConsumerService
    protected void handleNotification(UUID uuid, TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg> tbProtoQueueMsg, TbCallback tbCallback) {
        TransportProtos.ToRuleEngineNotificationMsg value = tbProtoQueueMsg.getValue();
        if (value.hasComponentLifecycle()) {
            handleComponentLifecycleMsg(uuid, ProtoUtils.fromProto(value.getComponentLifecycle()));
            tbCallback.onSuccess();
            return;
        }
        if (value.hasFromDeviceRpcResponse()) {
            TransportProtos.FromDeviceRPCResponseProto fromDeviceRpcResponse = value.getFromDeviceRpcResponse();
            this.tbDeviceRpcService.processRpcResponseFromDevice(new FromDeviceRpcResponse(new UUID(fromDeviceRpcResponse.getRequestIdMSB(), fromDeviceRpcResponse.getRequestIdLSB()), fromDeviceRpcResponse.getResponse(), fromDeviceRpcResponse.getError() > 0 ? RpcError.values()[fromDeviceRpcResponse.getError()] : null));
            tbCallback.onSuccess();
            return;
        }
        if (value.getQueueUpdateMsgsCount() > 0) {
            updateQueues(value.getQueueUpdateMsgsList());
            tbCallback.onSuccess();
        } else if (value.getQueueDeleteMsgsCount() > 0) {
            deleteQueues(value.getQueueDeleteMsgsList());
            tbCallback.onSuccess();
        } else {
            this.log.trace("Received notification with missing handler");
            tbCallback.onSuccess();
        }
    }

    private void updateQueues(List<TransportProtos.QueueUpdateMsg> list) {
        for (TransportProtos.QueueUpdateMsg queueUpdateMsg : list) {
            this.log.info("Received queue update msg: [{}]", queueUpdateMsg);
            TenantId fromUUID = TenantId.fromUUID(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
            if (this.partitionService.isManagedByCurrentService(fromUUID)) {
                QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB()));
                QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), fromUUID);
                Queue findQueueById = this.queueService.findQueueById(fromUUID, queueId);
                getConsumer(queueKey).ifPresentOrElse(tbRuleEngineQueueConsumerManager -> {
                    tbRuleEngineQueueConsumerManager.update(findQueueById);
                }, () -> {
                    createConsumer(queueKey, findQueueById);
                });
            }
        }
        this.partitionService.updateQueues(list);
        this.partitionService.recalculatePartitions(this.ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList(this.partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
    }

    private void deleteQueues(List<TransportProtos.QueueDeleteMsg> list) {
        for (TransportProtos.QueueDeleteMsg queueDeleteMsg : list) {
            this.log.info("Received queue delete msg: [{}]", queueDeleteMsg);
            removeConsumer(new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), TenantId.fromUUID(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())))).ifPresent(tbRuleEngineQueueConsumerManager -> {
                tbRuleEngineQueueConsumerManager.delete(true);
            });
        }
        this.partitionService.removeQueues(list);
        this.partitionService.recalculatePartitions(this.ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList(this.partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
    }

    @EventListener
    public void handleComponentLifecycleEvent(ComponentLifecycleMsg componentLifecycleMsg) {
        if (componentLifecycleMsg.getEntityId().getEntityType() == EntityType.TENANT && componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) {
            this.consumers.keySet().stream().filter(queueKey -> {
                return queueKey.getTenantId().equals(componentLifecycleMsg.getTenantId());
            }).toList().forEach(queueKey2 -> {
                removeConsumer(queueKey2).ifPresent(tbRuleEngineQueueConsumerManager -> {
                    tbRuleEngineQueueConsumerManager.delete(false);
                });
            });
        }
    }

    private Optional<TbRuleEngineQueueConsumerManager> getConsumer(QueueKey queueKey) {
        return Optional.ofNullable(this.consumers.get(queueKey));
    }

    private TbRuleEngineQueueConsumerManager createConsumer(QueueKey queueKey, Queue queue) {
        TbRuleEngineQueueConsumerManager build = TbRuleEngineQueueConsumerManager.create().ctx(this.ctx).queueKey(queueKey).consumerExecutor(this.consumersExecutor).scheduler(this.scheduler).taskExecutor(this.mgmtExecutor).build();
        this.consumers.put(queueKey, build);
        build.init(queue);
        return build;
    }

    private Optional<TbRuleEngineQueueConsumerManager> removeConsumer(QueueKey queueKey) {
        return Optional.ofNullable(this.consumers.remove(queueKey));
    }

    @Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}")
    public void printStats() {
        if (this.ctx.isStatsEnabled()) {
            long currentTimeMillis = System.currentTimeMillis();
            this.consumers.values().forEach(tbRuleEngineQueueConsumerManager -> {
                tbRuleEngineQueueConsumerManager.printStats(currentTimeMillis);
            });
        }
    }
}
